use crate::storage::{ProjectMetadata, UniqueProjectId};
use rusqlite::{Connection, Result as SqliteResult};
use serde::{Deserialize, Serialize};
use std::path::Path;
pub const DEFAULT_READER_POOL_SIZE: usize = 2;
pub const GLOBAL_REGISTRY_CACHE_SIZE_KIB: i64 = -2000; pub const GLOBAL_REGISTRY_MMAP_SIZE: i64 = 0;
pub const PROJECT_WRITER_CACHE_SIZE_KIB: i64 = -16000;
pub const PROJECT_READER_CACHE_SIZE_KIB: i64 = -2000;
pub const PROJECT_STORE_MMAP_SIZE: i64 = 67_108_864;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub db_path: String,
pub wal_enabled: bool,
pub cache_size_kib: Option<i64>,
pub mmap_size: Option<i64>,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
db_path: "leindex.db".to_string(),
wal_enabled: true,
cache_size_kib: Some(PROJECT_WRITER_CACHE_SIZE_KIB),
mmap_size: Some(PROJECT_STORE_MMAP_SIZE),
}
}
}
pub struct Storage {
conn: Connection,
#[allow(dead_code)]
config: StorageConfig,
}
impl Storage {
pub fn open<P: AsRef<Path>>(path: P) -> SqliteResult<Self> {
Self::open_with_config(path, StorageConfig::default())
}
pub fn open_with_config<P: AsRef<Path>>(path: P, config: StorageConfig) -> SqliteResult<Self> {
let conn = Connection::open(path)?;
if config.wal_enabled {
conn.pragma_update(None, "journal_mode", "WAL")?;
}
conn.pragma_update(None, "busy_timeout", 5000)?;
if let Some(cache_size_kib) = config.cache_size_kib {
conn.pragma_update(None, "cache_size", cache_size_kib)?;
}
if let Some(mmap_size) = config.mmap_size {
conn.pragma_update(None, "mmap_size", mmap_size)?;
}
let mut storage = Self { conn, config };
storage.run_migrations()?;
storage.initialize_schema()?;
Ok(storage)
}
fn initialize_schema(&mut self) -> SqliteResult<()> {
let project_metadata_schema = r#"
CREATE TABLE IF NOT EXISTS project_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
unique_project_id TEXT UNIQUE NOT NULL,
base_name TEXT NOT NULL,
path_hash TEXT NOT NULL,
instance INTEGER DEFAULT 0,
canonical_path TEXT NOT NULL,
display_name TEXT,
is_clone BOOLEAN DEFAULT 0,
cloned_from TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_indexed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(canonical_path)
)
"#;
let project_metadata_indexes = [
"CREATE INDEX IF NOT EXISTS idx_project_metadata_unique_id ON project_metadata(unique_project_id)",
"CREATE INDEX IF NOT EXISTS idx_project_metadata_canonical_path ON project_metadata(canonical_path)",
"CREATE INDEX IF NOT EXISTS idx_project_metadata_base_hash ON project_metadata(base_name, path_hash)",
"CREATE INDEX IF NOT EXISTS idx_project_metadata_base_name ON project_metadata(base_name)",
];
self.conn.execute(project_metadata_schema, [])?;
for index_sql in project_metadata_indexes {
self.conn.execute(index_sql, [])?;
}
self.conn.execute(
"CREATE TABLE IF NOT EXISTS indexed_files (
file_path TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
file_hash TEXT NOT NULL,
last_indexed INTEGER NOT NULL
)",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS intel_nodes (
id INTEGER PRIMARY KEY,
project_id TEXT NOT NULL,
file_path TEXT NOT NULL,
node_id TEXT NOT NULL,
symbol_name TEXT NOT NULL,
qualified_name TEXT NOT NULL,
language TEXT NOT NULL DEFAULT 'unknown',
node_type TEXT NOT NULL,
signature TEXT,
complexity INTEGER,
content_hash TEXT NOT NULL,
embedding BLOB,
byte_range_start INTEGER,
byte_range_end INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
embedding_format INTEGER
)",
[],
)?;
let columns: Vec<String> = self
.conn
.prepare("PRAGMA table_info(intel_nodes)")?
.query_map([], |row| row.get::<_, String>(1))?
.collect::<SqliteResult<Vec<_>>>()?;
if !columns.iter().any(|c| c == "node_id") {
self.conn.execute(
"ALTER TABLE intel_nodes ADD COLUMN node_id TEXT DEFAULT ''",
[],
)?;
self.conn.execute(
"UPDATE intel_nodes SET node_id = symbol_name WHERE node_id = ''",
[],
)?;
}
if !columns.iter().any(|c| c == "qualified_name") {
self.conn.execute(
"ALTER TABLE intel_nodes ADD COLUMN qualified_name TEXT DEFAULT ''",
[],
)?;
self.conn.execute(
"UPDATE intel_nodes SET qualified_name = symbol_name WHERE qualified_name = ''",
[],
)?;
}
if !columns.iter().any(|c| c == "language") {
self.conn.execute(
"ALTER TABLE intel_nodes ADD COLUMN language TEXT DEFAULT 'unknown'",
[],
)?;
}
if !columns.iter().any(|c| c == "byte_range_start") {
self.conn.execute(
"ALTER TABLE intel_nodes ADD COLUMN byte_range_start INTEGER",
[],
)?;
}
if !columns.iter().any(|c| c == "byte_range_end") {
self.conn.execute(
"ALTER TABLE intel_nodes ADD COLUMN byte_range_end INTEGER",
[],
)?;
}
if !columns.iter().any(|c| c == "embedding_format") {
self.conn.execute(
"ALTER TABLE intel_nodes ADD COLUMN embedding_format INTEGER",
[],
)?;
}
self.conn.execute(
"CREATE TABLE IF NOT EXISTS intel_edges (
caller_id INTEGER NOT NULL,
callee_id INTEGER NOT NULL,
edge_type TEXT NOT NULL,
metadata TEXT,
FOREIGN KEY(caller_id) REFERENCES intel_nodes(id),
FOREIGN KEY(callee_id) REFERENCES intel_nodes(id),
PRIMARY KEY(caller_id, callee_id, edge_type)
)",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS analysis_cache (
node_hash TEXT PRIMARY KEY,
cfg_data BLOB,
complexity_metrics BLOB,
timestamp INTEGER NOT NULL
)",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS cache_telemetry (
id INTEGER PRIMARY KEY CHECK (id = 1),
cache_hits INTEGER NOT NULL DEFAULT 0,
cache_misses INTEGER NOT NULL DEFAULT 0,
cache_writes INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
)",
[],
)?;
self.conn.execute(
"INSERT OR IGNORE INTO cache_telemetry (id, cache_hits, cache_misses, cache_writes, updated_at)
VALUES (1, 0, 0, 0, strftime('%s', 'now'))",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS global_symbols (
symbol_id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
symbol_name TEXT NOT NULL,
symbol_type TEXT NOT NULL,
signature TEXT,
file_path TEXT NOT NULL,
byte_range_start INTEGER,
byte_range_end INTEGER,
complexity INTEGER DEFAULT 1,
is_public INTEGER DEFAULT 0,
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
UNIQUE(project_id, symbol_name, signature)
)",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS external_refs (
ref_id TEXT PRIMARY KEY,
source_project_id TEXT NOT NULL,
source_symbol_id TEXT NOT NULL,
target_project_id TEXT NOT NULL,
target_symbol_id TEXT NOT NULL,
ref_type TEXT NOT NULL,
FOREIGN KEY (source_symbol_id) REFERENCES global_symbols(symbol_id),
FOREIGN KEY (target_symbol_id) REFERENCES global_symbols(symbol_id)
)",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS project_deps (
dep_id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
depends_on_project_id TEXT NOT NULL,
dependency_type TEXT NOT NULL,
UNIQUE(project_id, depends_on_project_id)
)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_nodes_project ON intel_nodes(project_id)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_nodes_file ON intel_nodes(file_path)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_nodes_symbol ON intel_nodes(symbol_name)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_nodes_hash ON intel_nodes(content_hash)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_symbols_name ON global_symbols(symbol_name)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_symbols_type ON global_symbols(symbol_type)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_symbols_project ON global_symbols(project_id)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_symbols_public ON global_symbols(symbol_id) WHERE is_public = 1",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_external_refs_source ON external_refs(source_symbol_id)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_external_refs_target ON external_refs(target_symbol_id)",
[],
)?;
self.conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_deps_project ON project_deps(project_id)",
[],
)?;
self.conn.execute(
"CREATE TABLE IF NOT EXISTS trigram_index (
project_id TEXT PRIMARY KEY,
index_data BLOB NOT NULL,
node_count INTEGER NOT NULL DEFAULT 0,
trigram_count INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
)",
[],
)?;
Ok(())
}
pub fn conn(&self) -> &Connection {
&self.conn
}
pub fn conn_mut(&mut self) -> &mut Connection {
&mut self.conn
}
pub fn close(&mut self) -> SqliteResult<()> {
if self.config.wal_enabled {
self.conn.execute("PRAGMA wal_checkpoint(TRUNCATE)", [])?;
}
Ok(())
}
pub fn load_existing_ids(&self, base_name: &str) -> SqliteResult<Vec<UniqueProjectId>> {
ProjectMetadata::load_existing_ids(&self.conn, base_name)
.map_err(|_| rusqlite::Error::InvalidQuery)
}
pub fn store_project_metadata(
&self,
unique_id: &UniqueProjectId,
project_path: &Path,
) -> SqliteResult<()> {
let metadata = ProjectMetadata::new(project_path);
let metadata = ProjectMetadata {
unique_project_id: unique_id.clone(),
..metadata
};
metadata
.save(&self.conn)
.map_err(|_| rusqlite::Error::InvalidQuery)
}
const SCHEMA_VERSION: u32 = 2;
fn run_migrations(&mut self) -> SqliteResult<()> {
self.conn.execute(
"CREATE TABLE IF NOT EXISTS schema_version (
key TEXT PRIMARY KEY,
version INTEGER NOT NULL
)",
[],
)?;
let current: u32 = self
.conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version WHERE key = 'schema'",
[],
|row| row.get(0),
)
.unwrap_or(0);
if current > Self::SCHEMA_VERSION {
return Err(rusqlite::Error::InvalidParameterName(format!(
"Database schema v{} is newer than this version (v{}). Please upgrade LeIndex.",
current,
Self::SCHEMA_VERSION
)));
}
if current < 2 {
self.migrate_v1_to_v2()?;
}
self.conn.execute(
"INSERT OR REPLACE INTO schema_version (key, version) VALUES ('schema', ?1)",
[Self::SCHEMA_VERSION],
)?;
Ok(())
}
fn migrate_v1_to_v2(&mut self) -> SqliteResult<()> {
let table_exists: bool = self.conn.query_row(
"SELECT EXISTS(
SELECT 1 FROM sqlite_master
WHERE type = 'table' AND name = 'project_metadata'
)",
[],
|row| row.get(0),
)?;
if !table_exists {
return Ok(());
}
let columns: Vec<String> = self
.conn
.prepare("PRAGMA table_info(project_metadata)")?
.query_map([], |row| row.get::<_, String>(1))?
.collect::<SqliteResult<Vec<_>>>()?;
if !columns.iter().any(|c| c == "last_indexed") {
self.conn.execute(
"ALTER TABLE project_metadata ADD COLUMN last_indexed TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
[],
)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_storage_creation() {
let temp_file = NamedTempFile::new().unwrap();
let storage = Storage::open(temp_file.path());
assert!(storage.is_ok());
}
#[test]
fn test_schema_initialization() {
let temp_file = NamedTempFile::new().unwrap();
let storage = Storage::open(temp_file.path()).unwrap();
let table_count: i64 = storage
.conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND (name LIKE 'intel_%' OR name = 'analysis_cache' OR name = 'cache_telemetry' OR name LIKE 'global_%' OR name LIKE 'external_%' OR name LIKE 'project_%')",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_count, 8); }
#[test]
fn test_project_writer_cache_budget() {
let temp_file = NamedTempFile::new().unwrap();
let storage = Storage::open(temp_file.path()).unwrap();
let cache_size: i64 = storage
.conn
.query_row("PRAGMA cache_size", [], |row| row.get(0))
.unwrap();
assert_eq!(
cache_size, PROJECT_WRITER_CACHE_SIZE_KIB,
"project writer cache_size should be {} (16 MiB), got {}",
PROJECT_WRITER_CACHE_SIZE_KIB, cache_size
);
}
#[test]
fn test_project_store_mmap_cap() {
let temp_file = NamedTempFile::new().unwrap();
let storage = Storage::open(temp_file.path()).unwrap();
let mmap_size: i64 = storage
.conn
.query_row("PRAGMA mmap_size", [], |row| row.get(0))
.unwrap();
assert_eq!(
mmap_size, PROJECT_STORE_MMAP_SIZE,
"project store mmap_size should be {} (64 MiB), got {}",
PROJECT_STORE_MMAP_SIZE, mmap_size
);
}
#[test]
fn test_project_reader_cache_budget() {
assert_eq!(
PROJECT_READER_CACHE_SIZE_KIB, -2000,
"reader cache should be -2000 (2 MiB thin budget)"
);
let temp_file = NamedTempFile::new().unwrap();
let reader_config = StorageConfig {
db_path: temp_file.path().to_string_lossy().to_string(),
wal_enabled: true,
cache_size_kib: Some(PROJECT_READER_CACHE_SIZE_KIB),
mmap_size: Some(PROJECT_STORE_MMAP_SIZE),
};
let storage = Storage::open_with_config(temp_file.path(), reader_config).unwrap();
let cache_size: i64 = storage
.conn
.query_row("PRAGMA cache_size", [], |row| row.get(0))
.unwrap();
assert_eq!(
cache_size, PROJECT_READER_CACHE_SIZE_KIB,
"reader cache_size should be {} (2 MiB), got {}",
PROJECT_READER_CACHE_SIZE_KIB, cache_size
);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageRole {
Writer,
Reader,
}
pub struct StoragePool {
writer: Storage,
readers: Vec<Storage>,
}
impl StoragePool {
pub fn open<P: AsRef<Path>>(
db_path: P,
writer_config: StorageConfig,
reader_config: StorageConfig,
) -> SqliteResult<Self> {
let writer = Storage::open_with_config(&db_path, writer_config)?;
let mut readers = Vec::with_capacity(DEFAULT_READER_POOL_SIZE);
for _ in 0..DEFAULT_READER_POOL_SIZE {
let reader = Storage::open_with_config(&db_path, reader_config.clone())?;
readers.push(reader);
}
Ok(Self { writer, readers })
}
pub fn open_with_pool_size<P: AsRef<Path>>(
db_path: P,
writer_config: StorageConfig,
reader_config: StorageConfig,
pool_size: usize,
) -> SqliteResult<Self> {
let writer = Storage::open_with_config(&db_path, writer_config)?;
let mut readers = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
let reader = Storage::open_with_config(&db_path, reader_config.clone())?;
readers.push(reader);
}
Ok(Self { writer, readers })
}
pub fn has_writer(&self) -> bool {
true }
pub fn writer(&self) -> &Storage {
&self.writer
}
pub fn writer_mut(&mut self) -> &mut Storage {
&mut self.writer
}
pub fn reader(&self, index: usize) -> Result<&Storage, StoragePoolError> {
self.readers
.get(index)
.ok_or(StoragePoolError::ReaderOutOfBounds {
index,
pool_size: self.readers.len(),
})
}
pub fn reader_mut(&mut self, index: usize) -> Result<&mut Storage, StoragePoolError> {
let pool_size = self.readers.len();
self.readers
.get_mut(index)
.ok_or(StoragePoolError::ReaderOutOfBounds { index, pool_size })
}
pub fn reader_count(&self) -> usize {
self.readers.len()
}
pub fn close_all(&mut self) -> SqliteResult<()> {
self.writer.close()?;
for reader in &mut self.readers {
reader.close()?;
}
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum StoragePoolError {
#[error("reader index {index} out of bounds (pool size: {pool_size})")]
ReaderOutOfBounds {
index: usize,
pool_size: usize,
},
}