#![allow(clippy::await_holding_lock)]
pub mod calls;
mod chunks;
mod metadata;
mod migrations;
mod notes;
mod search;
mod sparse;
mod types;
pub(crate) mod helpers;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
static WRITE_LOCK: Mutex<()> = Mutex::new(());
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
use sqlx::{ConnectOptions, SqlitePool};
use tokio::runtime::Runtime;
pub use calls::cross_project::{
CrossProjectCallee, CrossProjectCaller, CrossProjectContext, NamedStore,
};
pub use helpers::CallGraph;
pub use helpers::CallerInfo;
pub use helpers::CallerWithContext;
pub use helpers::ChunkIdentity;
pub use helpers::ChunkSummary;
pub use helpers::ParentContext;
pub use helpers::IndexStats;
pub use helpers::ModelInfo;
pub use helpers::NoteSearchResult;
pub use helpers::NoteStats;
pub use helpers::NoteSummary;
pub use helpers::SearchFilter;
pub use helpers::SearchResult;
pub use helpers::StaleFile;
pub use helpers::StaleReport;
pub use helpers::StoreError;
pub use helpers::UnifiedResult;
pub use helpers::CURRENT_SCHEMA_VERSION;
pub const MODEL_NAME: &str = crate::embedder::DEFAULT_MODEL_REPO;
pub const EXPECTED_DIMENSIONS: usize = crate::EMBEDDING_DIM;
pub use helpers::DEFAULT_NAME_BOOST;
pub use helpers::score_name_match;
pub use helpers::score_name_match_pre_lower;
pub use chunks::PruneAllResult;
pub use calls::CallStats;
pub use calls::DeadFunction;
pub use calls::DeadConfidence;
pub use calls::FunctionCallStats;
pub use types::TypeEdgeStats;
pub use types::TypeGraph;
pub use types::TypeUsage;
pub use search::set_rrf_k_from_config;
pub(crate) fn sanitize_fts_query(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for word in s
.split_whitespace()
.filter(|w| !matches!(*w, "OR" | "AND" | "NOT" | "NEAR"))
{
if !out.is_empty() {
out.push(' ');
}
out.extend(
word.chars().filter(|c| {
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':' | '{' | '}')
}),
);
}
let trimmed = out.trim();
if trimmed.is_empty() {
return String::new();
}
trimmed.to_string()
}
pub struct Store {
pub(crate) pool: SqlitePool,
pub(crate) rt: Runtime,
pub(crate) dim: usize,
closed: AtomicBool,
notes_summaries_cache: RwLock<Option<Arc<Vec<NoteSummary>>>>,
call_graph_cache: std::sync::OnceLock<std::sync::Arc<CallGraph>>,
test_chunks_cache: std::sync::OnceLock<std::sync::Arc<Vec<ChunkSummary>>>,
chunk_type_map_cache: std::sync::OnceLock<std::sync::Arc<ChunkTypeMap>>,
}
pub type ChunkTypeMap =
std::collections::HashMap<String, (crate::parser::ChunkType, crate::parser::Language)>;
struct StoreOpenConfig {
read_only: bool,
use_current_thread: bool,
max_connections: u32,
mmap_size: &'static str,
cache_size: &'static str,
}
impl Store {
pub fn dim(&self) -> usize {
self.dim
}
pub fn set_dim(&mut self, dim: usize) {
self.dim = dim;
}
pub fn open(path: &Path) -> Result<Self, StoreError> {
Self::open_with_config(
path,
StoreOpenConfig {
read_only: false,
use_current_thread: false,
max_connections: 4,
mmap_size: "268435456", cache_size: "-16384", },
)
}
pub fn open_readonly_pooled(path: &Path) -> Result<Self, StoreError> {
Self::open_with_config(
path,
StoreOpenConfig {
read_only: true,
use_current_thread: true,
max_connections: 1, mmap_size: "268435456", cache_size: "-16384", },
)
}
pub fn open_readonly(path: &Path) -> Result<Self, StoreError> {
Self::open_with_config(
path,
StoreOpenConfig {
read_only: true,
use_current_thread: true,
max_connections: 1,
mmap_size: "67108864", cache_size: "-4096", },
)
}
fn open_with_config(path: &Path, config: StoreOpenConfig) -> Result<Self, StoreError> {
let mode = if config.read_only { "readonly" } else { "open" };
let _span = tracing::info_span!("store_open", %mode, path = %path.display()).entered();
let rt = if config.use_current_thread {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?
} else {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.max_connections as usize)
.enable_all()
.build()?
};
let mut connect_opts = SqliteConnectOptions::new()
.filename(path)
.foreign_keys(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(std::time::Duration::from_secs(5))
.synchronous(SqliteSynchronous::Normal)
.pragma("mmap_size", config.mmap_size)
.log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_secs(5));
if config.read_only {
connect_opts = connect_opts.read_only(true);
} else {
connect_opts = connect_opts.create_if_missing(true);
}
let cache_pragma = format!("PRAGMA cache_size = {}", config.cache_size);
let pool = rt.block_on(async {
SqlitePoolOptions::new()
.max_connections(config.max_connections)
.idle_timeout(std::time::Duration::from_secs(30)) .after_connect(move |conn, _meta| {
let pragma = cache_pragma.clone();
Box::pin(async move {
sqlx::query(&pragma).execute(&mut *conn).await?;
sqlx::query("PRAGMA temp_store = MEMORY")
.execute(&mut *conn)
.await?;
Ok(())
})
})
.connect_with(connect_opts)
.await
})?;
#[cfg(unix)]
if !config.read_only {
use std::os::unix::fs::PermissionsExt;
let restrictive = std::fs::Permissions::from_mode(0o600);
if let Err(e) = std::fs::set_permissions(path, restrictive.clone()) {
tracing::debug!(path = %path.display(), error = %e, "Failed to set permissions");
}
let wal_path = path.with_extension("db-wal");
let shm_path = path.with_extension("db-shm");
if let Err(e) = std::fs::set_permissions(&wal_path, restrictive.clone()) {
tracing::debug!(path = %wal_path.display(), error = %e, "Failed to set permissions");
}
if let Err(e) = std::fs::set_permissions(&shm_path, restrictive) {
tracing::debug!(path = %shm_path.display(), error = %e, "Failed to set permissions");
}
}
tracing::info!(
path = %path.display(),
read_only = config.read_only,
"Database connected"
);
rt.block_on(async {
let result: (String,) = sqlx::query_as("PRAGMA integrity_check(1)")
.fetch_one(&pool)
.await?;
if result.0 != "ok" {
return Err(StoreError::Corruption(result.0));
}
Ok::<_, StoreError>(())
})?;
let dim = rt
.block_on(async {
let row: Option<(String,)> =
match sqlx::query_as("SELECT value FROM metadata WHERE key = 'dimensions'")
.fetch_optional(&pool)
.await
{
Ok(r) => r,
Err(sqlx::Error::Database(e)) if e.message().contains("no such table") => {
return Ok::<_, StoreError>(None);
}
Err(e) => return Err(e.into()),
};
Ok(match row {
Some((s,)) => match s.parse::<u32>() {
Ok(0) => {
tracing::warn!(raw = %s, "dimensions metadata is 0 — invalid, using default");
None
}
Ok(d) => Some(d as usize),
Err(e) => {
tracing::warn!(raw = %s, error = %e, "dimensions metadata is not a valid integer, using default");
None
}
},
None => None,
})
})?
.unwrap_or(crate::EMBEDDING_DIM);
let store = Self {
pool,
rt,
dim,
closed: AtomicBool::new(false),
notes_summaries_cache: RwLock::new(None),
call_graph_cache: std::sync::OnceLock::new(),
test_chunks_cache: std::sync::OnceLock::new(),
chunk_type_map_cache: std::sync::OnceLock::new(),
};
store.check_schema_version(path)?;
store.check_cq_version();
Ok(store)
}
pub(crate) async fn begin_write(
&self,
) -> Result<
(
std::sync::MutexGuard<'static, ()>,
sqlx::Transaction<'_, sqlx::Sqlite>,
),
sqlx::Error,
> {
let guard = WRITE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tx = self.pool.begin().await?;
Ok((guard, tx))
}
pub fn init(&self, model_info: &ModelInfo) -> Result<(), StoreError> {
let _span = tracing::info_span!("Store::init").entered();
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
let schema = include_str!("../schema.sql");
for statement in schema.split(';') {
let stmt: String = statement
.lines()
.skip_while(|line| {
let trimmed = line.trim();
trimmed.is_empty() || trimmed.starts_with("--")
})
.collect::<Vec<_>>()
.join("\n");
let stmt = stmt.trim();
if stmt.is_empty() {
continue;
}
sqlx::query(stmt).execute(&mut *tx).await?;
}
let now = chrono::Utc::now().to_rfc3339();
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("schema_version")
.bind(CURRENT_SCHEMA_VERSION.to_string())
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("model_name")
.bind(&model_info.name)
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("dimensions")
.bind(model_info.dimensions.to_string())
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("created_at")
.bind(&now)
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("cq_version")
.bind(env!("CARGO_PKG_VERSION"))
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::info!(
schema_version = CURRENT_SCHEMA_VERSION,
"Schema initialized"
);
Ok(())
})
}
pub fn close(self) -> Result<(), StoreError> {
self.closed.store(true, Ordering::Release);
self.rt.block_on(async {
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&self.pool)
.await?;
tracing::debug!("WAL checkpoint completed");
self.pool.close().await;
Ok(())
})
}
}
impl Drop for Store {
fn drop(&mut self) {
if self.closed.load(Ordering::Acquire) {
return; }
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Err(e) = self.rt.block_on(async {
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&self.pool)
.await
}) {
tracing::warn!(error = %e, "WAL checkpoint on drop failed (non-fatal)");
}
}));
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
use crate::nl::normalize_for_fts;
proptest! {
#[test]
fn fuzz_normalize_for_fts_no_panic(input in "\\PC{0,500}") {
let _ = normalize_for_fts(&input);
}
#[test]
fn fuzz_normalize_for_fts_safe_output(input in "\\PC{0,200}") {
let result = normalize_for_fts(&input);
for c in result.chars() {
prop_assert!(
c.is_alphanumeric() || c == ' ' || c == '_',
"Unexpected char '{}' (U+{:04X}) in output: {}",
c, c as u32, result
);
}
}
#[test]
fn fuzz_normalize_for_fts_special_chars(
prefix in "[a-z]{0,10}",
special in prop::sample::select(vec!['*', '"', ':', '^', '(', ')', '-', '+']),
suffix in "[a-z]{0,10}"
) {
let input = format!("{}{}{}", prefix, special, suffix);
let result = normalize_for_fts(&input);
prop_assert!(
!result.contains(special),
"Special char '{}' should be stripped from: {} -> {}",
special, input, result
);
}
#[test]
fn fuzz_normalize_for_fts_unicode(input in "[\\p{L}\\p{N}\\s]{0,100}") {
let result = normalize_for_fts(&input);
prop_assert!(result.len() <= input.len() * 4);
}
#[test]
fn prop_sanitize_no_special_chars(input in "\\PC{0,500}") {
let result = sanitize_fts_query(&input);
for c in result.chars() {
prop_assert!(
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
"FTS5 special char '{}' in sanitized output: {}",
c, result
);
}
}
#[test]
fn prop_sanitize_no_operators(input in "\\PC{0,300}") {
let result = sanitize_fts_query(&input);
for word in result.split_whitespace() {
prop_assert!(
!matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
"FTS5 operator '{}' survived sanitization: {}",
word, result
);
}
}
#[test]
fn prop_pipeline_safe(input in "\\PC{0,300}") {
let result = sanitize_fts_query(&normalize_for_fts(&input));
for c in result.chars() {
prop_assert!(
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
"Special char '{}' in pipeline output: {}",
c, result
);
}
for word in result.split_whitespace() {
prop_assert!(
!matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
"Operator '{}' in pipeline output: {}",
word, result
);
}
}
#[test]
fn prop_sanitize_all_special(
chars in prop::collection::vec(
prop::sample::select(vec!['"', '*', '(', ')', '+', '-', '^', ':']),
1..50
)
) {
let input: String = chars.into_iter().collect();
let result = sanitize_fts_query(&input);
prop_assert!(
result.is_empty(),
"All-special input should produce empty output, got: {}",
result
);
}
#[test]
fn prop_sanitize_operators_removed(
pre in "[a-z]{1,10}",
op in prop::sample::select(vec!["OR", "AND", "NOT", "NEAR"]),
post in "[a-z]{1,10}"
) {
let input = format!("{} {} {}", pre, op, post);
let result = sanitize_fts_query(&input);
prop_assert!(
!result.split_whitespace().any(|w| w == op),
"Operator '{}' not stripped from: {} -> {}",
op, input, result
);
prop_assert!(result.contains(&pre), "Pre-text '{}' missing from: {}", pre, result);
prop_assert!(result.contains(&post), "Post-text '{}' missing from: {}", post, result);
}
#[test]
fn prop_sanitize_adversarial(
normal in "[a-z]{1,10}",
special in prop::sample::select(vec!['"', '*', '(', ')', '+', '-', '^', ':']),
op in prop::sample::select(vec!["OR", "AND", "NOT", "NEAR"]),
) {
let input = format!("{}{} {} {}{}", special, normal, op, normal, special);
let result = sanitize_fts_query(&input);
for c in result.chars() {
prop_assert!(
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
"Special char '{}' in adversarial output: {}",
c, result
);
}
for word in result.split_whitespace() {
prop_assert!(
!matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
"Operator '{}' in adversarial output: {}",
word, result
);
}
}
}
fn make_test_store_initialized() -> (Store, tempfile::TempDir) {
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("index.db");
let store = Store::open(&db_path).unwrap();
store.init(&ModelInfo::default()).unwrap();
(store, dir)
}
#[test]
fn concurrent_readonly_opens() {
let (_writer, dir) = make_test_store_initialized();
let db_path = dir.path().join("index.db");
let ro1 = Store::open_readonly(&db_path).expect("first readonly open failed");
let ro2 = Store::open_readonly(&db_path).expect("second readonly open failed");
assert!(ro1.check_model_version().is_ok());
assert!(ro2.check_model_version().is_ok());
}
#[test]
fn readonly_open_while_writer_holds() {
let (writer, dir) = make_test_store_initialized();
let db_path = dir.path().join("index.db");
let ro = Store::open_readonly(&db_path).expect("readonly open failed while writer active");
assert!(ro.check_model_version().is_ok());
drop(writer);
}
#[test]
fn onclock_cache_not_invalidated_by_writes() {
let (store, _dir) = make_test_store_initialized();
let graph_before = store.get_call_graph().expect("first get_call_graph failed");
let callers_before = graph_before.forward.len();
store
.upsert_function_calls(
std::path::Path::new("test.rs"),
&[crate::parser::FunctionCalls {
name: "caller".to_string(),
line_start: 1,
calls: vec![crate::parser::CallSite {
callee_name: "callee".to_string(),
line_number: 2,
}],
}],
)
.unwrap();
let graph_after = store
.get_call_graph()
.expect("second get_call_graph failed");
assert_eq!(
graph_after.forward.len(),
callers_before,
"OnceLock cache should not be invalidated by writes within the same Store lifetime"
);
}
#[test]
fn double_init_is_idempotent() {
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("index.db");
let store = Store::open(&db_path).unwrap();
store
.init(&ModelInfo::default())
.expect("first init() failed");
store
.init(&ModelInfo::default())
.expect("second init() should be idempotent but failed");
}
}