use leindex::storage::schema::{
StorageConfig, StoragePool, DEFAULT_READER_POOL_SIZE, PROJECT_READER_CACHE_SIZE_KIB,
PROJECT_STORE_MMAP_SIZE, PROJECT_WRITER_CACHE_SIZE_KIB,
};
use tempfile::TempDir;
fn make_pool_dir() -> TempDir {
tempfile::tempdir().unwrap()
}
fn writer_config() -> StorageConfig {
StorageConfig {
db_path: "leindex.db".to_string(),
wal_enabled: true,
cache_size_kib: Some(PROJECT_WRITER_CACHE_SIZE_KIB),
mmap_size: Some(PROJECT_STORE_MMAP_SIZE),
}
}
fn reader_config() -> StorageConfig {
StorageConfig {
db_path: "leindex.db".to_string(),
wal_enabled: true,
cache_size_kib: Some(PROJECT_READER_CACHE_SIZE_KIB),
mmap_size: Some(PROJECT_STORE_MMAP_SIZE),
}
}
#[test]
fn test_pool_opens_one_writer_and_fixed_reader_pool() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
assert!(pool.has_writer(), "pool should have a writer connection");
assert_eq!(
pool.reader_count(),
DEFAULT_READER_POOL_SIZE,
"reader pool should have {} connections",
DEFAULT_READER_POOL_SIZE,
);
}
#[test]
fn test_pool_concurrent_reads_succeed() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
pool.writer()
.conn()
.execute(
"CREATE TABLE IF NOT EXISTS test_data (key TEXT PRIMARY KEY, val INTEGER)",
[],
)
.unwrap();
pool.writer()
.conn()
.execute("INSERT INTO test_data (key, val) VALUES ('a', 1)", [])
.unwrap();
pool.writer()
.conn()
.execute("INSERT INTO test_data (key, val) VALUES ('b', 2)", [])
.unwrap();
for i in 0..DEFAULT_READER_POOL_SIZE {
let reader = pool.reader(i).expect("should get reader");
let val: i64 = reader
.conn()
.query_row("SELECT val FROM test_data WHERE key = 'a'", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(val, 1, "reader {} should read correct data", i);
}
}
#[test]
fn test_pool_no_unbounded_connection_fanout() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
let result = pool.reader(DEFAULT_READER_POOL_SIZE);
assert!(
result.is_err(),
"requesting reader beyond pool size should fail",
);
}
#[test]
fn test_reader_connections_have_thin_cache() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
for i in 0..DEFAULT_READER_POOL_SIZE {
let reader = pool.reader(i).unwrap();
let cache_size: i64 = reader
.conn()
.query_row("PRAGMA cache_size", [], |row| row.get(0))
.unwrap();
assert_eq!(
cache_size, PROJECT_READER_CACHE_SIZE_KIB,
"reader {} cache_size should be {} (2 MiB thin), got {}",
i, PROJECT_READER_CACHE_SIZE_KIB, cache_size,
);
}
}
#[test]
fn test_reader_connections_have_correct_mmap_cap() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
for i in 0..DEFAULT_READER_POOL_SIZE {
let reader = pool.reader(i).unwrap();
let mmap_size: i64 = reader
.conn()
.query_row("PRAGMA mmap_size", [], |row| row.get(0))
.unwrap();
assert_eq!(
mmap_size, PROJECT_STORE_MMAP_SIZE,
"reader {} mmap_size should be {} (64 MiB), got {}",
i, PROJECT_STORE_MMAP_SIZE, mmap_size,
);
}
}
#[test]
fn test_writer_has_writer_cache_budget() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
let cache_size: i64 = pool
.writer()
.conn()
.query_row("PRAGMA cache_size", [], |row| row.get(0))
.unwrap();
assert_eq!(
cache_size, PROJECT_WRITER_CACHE_SIZE_KIB,
"writer cache_size should be {} (16 MiB), got {}",
PROJECT_WRITER_CACHE_SIZE_KIB, cache_size,
);
}
#[test]
fn test_writer_performs_write_operations() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
pool.writer()
.conn()
.execute(
"CREATE TABLE test_write (id INTEGER PRIMARY KEY, data TEXT)",
[],
)
.unwrap();
pool.writer()
.conn()
.execute("INSERT INTO test_write (id, data) VALUES (1, 'hello')", [])
.unwrap();
let reader = pool.reader(0).unwrap();
let count: i64 = reader
.conn()
.query_row("SELECT COUNT(*) FROM test_write", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 1, "reader should see writer's data");
}
#[test]
fn test_writer_and_reader_roles_are_distinct() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
let writer_cache: i64 = pool
.writer()
.conn()
.query_row("PRAGMA cache_size", [], |row| row.get(0))
.unwrap();
let reader_cache: i64 = pool
.reader(0)
.unwrap()
.conn()
.query_row("PRAGMA cache_size", [], |row| row.get(0))
.unwrap();
assert_ne!(
writer_cache, reader_cache,
"writer cache ({}) should differ from reader cache ({})",
writer_cache, reader_cache,
);
assert!(
writer_cache.abs() > reader_cache.abs(),
"writer cache ({}) should be larger than reader cache ({})",
writer_cache,
reader_cache,
);
}
#[test]
fn test_indexing_backpressure_stays_bounded() {
use leindex::search::search::IndexingAdmissionGate;
let mut gate = IndexingAdmissionGate::with_caps(50, 10_000);
let mut admitted = 0;
let mut shed = 0;
for i in 0..200 {
let content_bytes = 100 + (i % 50) * 10;
if gate.try_admit(content_bytes) {
admitted += 1;
} else {
shed += 1;
}
}
assert!(
admitted <= 50,
"admitted {} nodes, expected at most 50",
admitted,
);
assert!(
shed > 0,
"expected some nodes to be shed under pressure, admitted={}, shed={}",
admitted,
shed,
);
}
#[test]
fn test_parser_pool_is_bounded() {
use leindex::parse::parallel::ParallelParser;
let dir = tempfile::tempdir().unwrap();
let mut paths = Vec::new();
for i in 0..50 {
let path = dir.path().join(format!("file_{:03}.rs", i));
std::fs::write(&path, format!("fn func_{}() {{}}", i)).unwrap();
paths.push(path);
}
let parser = ParallelParser::new().with_max_threads(4);
let (results, stats) = parser.parse_files_with_stats(paths);
assert_eq!(stats.total_files, 50);
assert!(
results.iter().filter(|r| r.is_success()).count() > 0,
"at least some files should parse successfully",
);
}
#[test]
fn test_backpressure_with_large_content() {
use leindex::search::search::IndexingAdmissionGate;
let mut gate = IndexingAdmissionGate::with_caps(100, 1024);
assert!(
!gate.try_admit(2048),
"oversized content (2 KiB) should be rejected when cap is 1 KiB",
);
assert_eq!(gate.nodes_admitted(), 0);
assert!(
gate.try_admit(256),
"small content (256 B) should be admitted",
);
assert_eq!(gate.nodes_admitted(), 1);
}
#[test]
fn test_pool_writer_wal_mode_enabled() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
let journal_mode: String = pool
.writer()
.conn()
.query_row("PRAGMA journal_mode", [], |row| row.get(0))
.unwrap();
assert_eq!(
journal_mode.to_lowercase(),
"wal",
"writer should use WAL mode, got {}",
journal_mode,
);
}
#[test]
fn test_pool_readers_see_writer_data_via_wal() {
let dir = make_pool_dir();
let db_path = dir.path().join("leindex.db");
let pool = StoragePool::open(&db_path, writer_config(), reader_config()).unwrap();
pool.writer()
.conn()
.execute("CREATE TABLE kv (k TEXT PRIMARY KEY, v INTEGER)", [])
.unwrap();
for i in 0..10 {
pool.writer()
.conn()
.execute(
"INSERT INTO kv (k, v) VALUES (?1, ?2)",
rusqlite::params![format!("key{}", i), i],
)
.unwrap();
}
for slot in 0..DEFAULT_READER_POOL_SIZE {
let reader = pool.reader(slot).unwrap();
let count: i64 = reader
.conn()
.query_row("SELECT COUNT(*) FROM kv", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 10, "reader {} should see all 10 rows", slot,);
}
}