use crate::{
BM25Config, BM25Index, EmbeddingConfig, MLXBridge, ProviderConfig, SearchOptions, SliceLayer,
SliceMode, compute_content_hash, rag::RAGPipeline, storage::StorageManager,
};
use anyhow::{Result, anyhow};
use serde_json::json;
use std::sync::Arc;
use tokio::sync::Mutex;
async fn try_mlx_bridge() -> Option<Arc<Mutex<MLXBridge>>> {
let config = EmbeddingConfig {
required_dimension: 4096,
max_batch_chars: 32000,
max_batch_items: 16,
providers: vec![ProviderConfig {
name: "test-local".to_string(),
base_url: "http://localhost:12345".to_string(),
model: "test-model".to_string(),
priority: 1,
endpoint: "/v1/embeddings".to_string(),
}],
..Default::default()
};
match MLXBridge::new(&config).await {
Ok(bridge) => Some(Arc::new(Mutex::new(bridge))),
Err(_) => None,
}
}
macro_rules! require_mlx {
($mlx:expr) => {
match $mlx {
Some(bridge) => bridge,
None => {
eprintln!("⚠ Skipping test: MLX server unavailable at localhost:12345");
return Ok(());
}
}
};
}
#[tokio::test]
async fn memory_roundtrip_and_search() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let storage = Arc::new(StorageManager::new(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage.clone()).await?;
let returned_id = rag
.index_text_with_mode(
Some("testns"),
"doc1".to_string(),
"Ala ma kota".to_string(),
json!({"lang": "pl"}),
SliceMode::Flat,
)
.await?;
assert_eq!(returned_id, "doc1");
let fetched = rag
.lookup_memory("testns", "doc1")
.await?
.ok_or_else(|| anyhow!("doc missing"))?;
assert_eq!(fetched.text, "Ala ma kota");
assert_eq!(fetched.namespace, "testns");
let results = rag.search_memory("testns", "kota", 1).await?;
assert!(!results.is_empty(), "expected at least one search result");
assert_eq!(results[0].namespace, "testns");
Ok(())
}
#[tokio::test]
async fn rag_pipeline_syncs_bm25_writes() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let bm25_path = tmp.path().join(".bm25");
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let bm25 = Arc::new(BM25Index::new(
&BM25Config::default().with_path(bm25_path.to_string_lossy().into_owned()),
)?);
let rag = RAGPipeline::new_with_bm25(mlx, storage, Some(bm25.clone())).await?;
rag.index_text_with_mode(
Some("testns"),
"doc1".to_string(),
"Ala ma kota".to_string(),
json!({"lang": "pl"}),
SliceMode::Flat,
)
.await?;
let results = bm25.search("kota", Some("testns"), 10)?;
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, "doc1");
Ok(())
}
#[tokio::test]
async fn rag_pipeline_rolls_back_lance_write_when_bm25_write_fails() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let bm25_path = tmp.path().join(".bm25");
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let bm25 = Arc::new(BM25Index::new(
&BM25Config::default()
.with_path(bm25_path.to_string_lossy().into_owned())
.with_read_only(true),
)?);
let rag = RAGPipeline::new_with_bm25(mlx, storage.clone(), Some(bm25)).await?;
let err = rag
.index_text_with_mode(
Some("rollback-ns"),
"doc1".to_string(),
"This batch should never survive a failed BM25 write because rollback keeps LanceDB truthful."
.to_string(),
json!({"kind": "rollback-guard"}),
SliceMode::Flat,
)
.await
.expect_err("read-only BM25 should reject the write path");
assert!(
err.to_string().contains("read-only mode"),
"unexpected error: {err}"
);
assert!(
storage
.get_all_in_namespace("rollback-ns")
.await?
.is_empty(),
"LanceDB rows should be rolled back when BM25 indexing fails"
);
Ok(())
}
#[tokio::test]
async fn onion_text_index_overrides_stale_slice_mode_metadata() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage).await?;
rag.index_text_with_mode(
Some("onion-test"),
"doc1".to_string(),
"This is a longer document about onions and embeddings. It should create layered slices for metadata verification."
.to_string(),
json!({"slice_mode": "flat"}),
SliceMode::OnionFast,
)
.await?;
let docs = rag
.storage_manager()
.get_all_in_namespace("onion-test")
.await?;
assert!(!docs.is_empty(), "expected onion slices to be stored");
assert!(
docs.iter()
.all(|doc| doc.metadata["slice_mode"] == "onion-fast"),
"all stored slices should carry the actual onion slice_mode"
);
Ok(())
}
#[test]
fn search_options_default_to_outer_layer() {
assert_eq!(
SearchOptions::default().layer_filter,
Some(SliceLayer::Outer)
);
}
#[test]
fn test_content_hash_deterministic() {
let content = "Test content for hashing";
let hash1 = compute_content_hash(content);
let hash2 = compute_content_hash(content);
assert_eq!(hash1, hash2);
assert_eq!(hash1.len(), 64);
let hash3 = compute_content_hash("Different content");
assert_ne!(hash1, hash3);
}
#[test]
fn test_content_hash_slight_difference() {
let hash1 = compute_content_hash("Test content");
let hash2 = compute_content_hash("Test content.");
assert_ne!(hash1, hash2);
}
#[tokio::test]
async fn test_exact_dedup_skips_identical_content() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let test_file = tmp.path().join("test.txt");
let content = "This is test content for deduplication testing.";
std::fs::write(&test_file, content)?;
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage.clone()).await?;
let result1 = rag
.index_document_with_dedup(&test_file, Some("dedup-test"), SliceMode::Flat)
.await?;
assert!(result1.was_indexed(), "First indexing should succeed");
let result2 = rag
.index_document_with_dedup(&test_file, Some("dedup-test"), SliceMode::Flat)
.await?;
assert!(
result2.is_skipped(),
"Second indexing should be skipped as duplicate"
);
assert_eq!(result1.content_hash(), result2.content_hash());
Ok(())
}
#[tokio::test]
async fn test_dedup_allows_different_content() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let test_file1 = tmp.path().join("test1.txt");
let test_file2 = tmp.path().join("test2.txt");
std::fs::write(&test_file1, "Content of file one.")?;
std::fs::write(&test_file2, "Content of file two.")?;
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage.clone()).await?;
let result1 = rag
.index_document_with_dedup(&test_file1, Some("dedup-test"), SliceMode::Flat)
.await?;
assert!(result1.was_indexed());
let result2 = rag
.index_document_with_dedup(&test_file2, Some("dedup-test"), SliceMode::Flat)
.await?;
assert!(result2.was_indexed(), "Different content should be indexed");
assert_ne!(result1.content_hash(), result2.content_hash());
Ok(())
}
#[tokio::test]
async fn test_dedup_different_namespaces() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let test_file = tmp.path().join("test.txt");
std::fs::write(&test_file, "Same content in different namespaces.")?;
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage.clone()).await?;
let result1 = rag
.index_document_with_dedup(&test_file, Some("namespace-a"), SliceMode::Flat)
.await?;
assert!(result1.was_indexed());
let result2 = rag
.index_document_with_dedup(&test_file, Some("namespace-b"), SliceMode::Flat)
.await?;
assert!(
result2.was_indexed(),
"Same content in different namespace should be indexed"
);
Ok(())
}
#[tokio::test]
async fn test_has_content_hash() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage.clone()).await?;
let test_file = tmp.path().join("test.txt");
let content = "Content for hash lookup test.";
std::fs::write(&test_file, content)?;
let content_hash = compute_content_hash(content);
let exists_before = rag
.storage_manager()
.has_content_hash("hash-test", &content_hash)
.await?;
assert!(!exists_before, "Hash should not exist before indexing");
let result = rag
.index_document_with_dedup(&test_file, Some("hash-test"), SliceMode::Flat)
.await?;
assert!(result.was_indexed());
let exists_after = rag
.storage_manager()
.has_content_hash("hash-test", &content_hash)
.await?;
assert!(exists_after, "Hash should exist after indexing");
let fake_hash = compute_content_hash("non-existent content");
let fake_exists = rag
.storage_manager()
.has_content_hash("hash-test", &fake_hash)
.await?;
assert!(!fake_exists, "Non-existent hash should return false");
Ok(())
}
#[tokio::test]
async fn memory_upsert_replaces_onion_family_by_original_id() -> Result<()> {
let mlx = require_mlx!(try_mlx_bridge().await);
let tmp = tempfile::tempdir()?;
let db_path = tmp.path().join(".lancedb");
let storage = Arc::new(StorageManager::new_lance_only(&db_path.to_string_lossy()).await?);
storage.ensure_collection().await?;
let rag = RAGPipeline::new(mlx, storage.clone()).await?;
let namespace = "memory-upsert-onion";
let initial_text = "This is a long document about indexing and embeddings. It exists to force onion slicing during the first upsert. \
The content is intentionally verbose and repetitive so the outer and core layers both exist and can later be replaced cleanly. \
Project Vista keeps appearing here to provide a stable metadata anchor for search filters.";
let updated_text = "This is an updated long document about project search filters, namespace cache invalidation, and release hardening. \
It should fully replace the previous onion family instead of appending stale slices. \
The text stays comfortably above the slicing threshold so onion-fast still writes both outer and core documents.";
rag.memory_upsert(
namespace,
"doc-1".to_string(),
initial_text.to_string(),
json!({"slice_mode": "onion-fast", "project": "Vista"}),
)
.await?;
let initial_docs = rag
.storage_manager()
.get_all_in_namespace(namespace)
.await?;
assert_eq!(
initial_docs.len(),
2,
"onion-fast should create outer + core"
);
assert!(
initial_docs
.iter()
.all(|doc| doc.metadata["original_id"] == "doc-1")
);
let fetched = rag
.lookup_memory(namespace, "doc-1")
.await?
.ok_or_else(|| anyhow!("expected lookup by original id to resolve onion family"))?;
assert_eq!(fetched.layer, Some(SliceLayer::Outer));
assert_eq!(fetched.metadata["original_id"], "doc-1");
rag.memory_upsert(
namespace,
"doc-1".to_string(),
updated_text.to_string(),
json!({"slice_mode": "onion-fast", "project": "Vista"}),
)
.await?;
let updated_docs = rag
.storage_manager()
.get_all_in_namespace(namespace)
.await?;
assert_eq!(
updated_docs.len(),
2,
"upsert should replace the onion family instead of duplicating it"
);
assert!(
updated_docs
.iter()
.all(|doc| doc.metadata["original_id"] == "doc-1")
);
assert!(
updated_docs
.iter()
.any(|doc| doc.document.contains("namespace cache invalidation")),
"updated content should be present in the stored family"
);
let deleted = rag.remove_memory(namespace, "doc-1").await?;
assert_eq!(
deleted, 2,
"remove_memory should delete the whole onion family"
);
assert!(
rag.storage_manager()
.get_all_in_namespace(namespace)
.await?
.is_empty()
);
Ok(())
}