#![allow(clippy::await_holding_lock)]
mod backup;
pub mod calls;
mod chunks;
mod metadata;
mod migrations;
mod notes;
mod search;
mod sparse;
mod types;
pub(crate) mod helpers;
use std::marker::PhantomData;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
static WRITE_LOCK: Mutex<()> = Mutex::new(());
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteSynchronous};
use sqlx::{ConnectOptions, SqlitePool};
use tokio::runtime::Runtime;
pub use calls::cross_project::{
CrossProjectCallee, CrossProjectCaller, CrossProjectContext, NamedStore,
};
pub use helpers::CallGraph;
pub use helpers::CallerInfo;
pub use helpers::CallerWithContext;
pub use helpers::ChunkIdentity;
pub use helpers::ChunkSummary;
pub use helpers::ParentContext;
pub use helpers::IndexStats;
pub use helpers::ModelInfo;
pub use helpers::NoteSearchResult;
pub use helpers::NoteStats;
pub use helpers::NoteSummary;
pub use helpers::SearchFilter;
pub use helpers::SearchResult;
pub use helpers::StaleFile;
pub use helpers::StaleReport;
pub use helpers::StoreError;
pub use helpers::UnifiedResult;
pub use helpers::CURRENT_SCHEMA_VERSION;
pub use metadata::HnswKind;
pub const MODEL_NAME: &str = crate::embedder::DEFAULT_MODEL_REPO;
pub const EXPECTED_DIMENSIONS: usize = crate::EMBEDDING_DIM;
pub use helpers::DEFAULT_NAME_BOOST;
pub use helpers::score_name_match;
pub use helpers::score_name_match_pre_lower;
pub use chunks::PruneAllResult;
pub use calls::CallStats;
pub use calls::DeadFunction;
pub use calls::DeadConfidence;
pub use calls::FunctionCallStats;
pub use types::TypeEdgeStats;
pub use types::TypeGraph;
pub use types::TypeUsage;
pub use search::set_rrf_k_from_config;
pub(crate) fn sanitize_fts_query(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for word in s
.split_whitespace()
.filter(|w| !matches!(*w, "OR" | "AND" | "NOT" | "NEAR"))
{
if !out.is_empty() {
out.push(' ');
}
out.extend(
word.chars().filter(|c| {
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':' | '{' | '}')
}),
);
}
let trimmed = out.trim();
if trimmed.is_empty() {
return String::new();
}
trimmed.to_string()
}
#[derive(Debug, Clone, Copy)]
pub struct ReadOnly;
#[derive(Debug, Clone, Copy)]
pub struct ReadWrite;
pub struct Store<Mode = ReadWrite> {
pub(crate) pool: SqlitePool,
pub(crate) rt: Arc<Runtime>,
pub(crate) dim: usize,
closed: AtomicBool,
notes_summaries_cache: RwLock<Option<Arc<Vec<NoteSummary>>>>,
note_boost_cache: RwLock<Option<Arc<crate::search::scoring::OwnedNoteBoostIndex>>>,
call_graph_cache: std::sync::OnceLock<std::sync::Arc<CallGraph>>,
test_chunks_cache: std::sync::OnceLock<std::sync::Arc<Vec<ChunkSummary>>>,
chunk_type_map_cache: std::sync::OnceLock<std::sync::Arc<ChunkTypeMap>>,
_mode: PhantomData<Mode>,
}
pub type ChunkTypeMap =
std::collections::HashMap<String, (crate::parser::ChunkType, crate::parser::Language)>;
struct StoreOpenConfig {
read_only: bool,
use_current_thread: bool,
max_connections: u32,
mmap_size: String,
cache_size: String,
runtime: Option<Arc<Runtime>>,
}
const SLOW_MMAP_FSTYPES: &[&str] = &[
"9p",
"cifs",
"smb3",
"smbfs",
"drvfs",
"fuse.drvfs",
"nfs",
"nfs4",
"ntfs",
"ntfs3",
"fuseblk",
];
fn mmap_size_env_override() -> Option<String> {
std::env::var("CQS_MMAP_SIZE")
.ok()
.and_then(|v| v.trim().parse::<u64>().ok().map(|n| n.to_string()))
}
fn resolve_mmap_size(default_bytes: &str, db_path: &Path) -> String {
if let Some(explicit) = mmap_size_env_override() {
return explicit;
}
if is_slow_mmap_fs(db_path) {
tracing::info!(
path = %db_path.display(),
"Slow FS detected (WSL/9P/NTFS/SMB/NFS), disabling SQLite mmap (set CQS_MMAP_SIZE to override)"
);
return "0".to_string();
}
default_bytes.to_string()
}
fn is_slow_mmap_fs(path: &Path) -> bool {
#[cfg(unix)]
{
let canonical = dunce::canonicalize(path).or_else(|_| {
path.parent().map_or_else(
|| {
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"no parent",
))
},
dunce::canonicalize,
)
});
let abs = match canonical {
Ok(p) => p,
Err(_) => return false,
};
let mountinfo = match std::fs::read_to_string("/proc/self/mountinfo") {
Ok(c) => c,
Err(_) => return false,
};
match fstype_for_path(&mountinfo, &abs) {
Some(fstype) => SLOW_MMAP_FSTYPES.iter().any(|&s| s == fstype),
None => false,
}
}
#[cfg(not(unix))]
{
let _ = path;
false
}
}
fn fstype_for_path(mountinfo: &str, abs_path: &Path) -> Option<String> {
let abs_str = abs_path.to_str()?;
let mut best: Option<(usize, String)> = None;
for line in mountinfo.lines() {
let (left, right) = match line.split_once(" - ") {
Some(parts) => parts,
None => continue,
};
let left_fields: Vec<&str> = left.split_whitespace().collect();
if left_fields.len() < 5 {
continue;
}
let mount_point = left_fields[4];
let mp = mount_point.replace("\\040", " ");
if !path_starts_with(abs_str, &mp) {
continue;
}
let right_fields: Vec<&str> = right.split_whitespace().collect();
let fstype = match right_fields.first() {
Some(t) => *t,
None => continue,
};
let len = mp.len();
if best.as_ref().is_none_or(|(blen, _)| len > *blen) {
best = Some((len, fstype.to_string()));
}
}
best.map(|(_, fs)| fs)
}
fn path_starts_with(path: &str, prefix: &str) -> bool {
if prefix == "/" {
return path.starts_with('/');
}
if !path.starts_with(prefix) {
return false;
}
path.len() == prefix.len() || path.as_bytes()[prefix.len()] == b'/'
}
#[cfg(test)]
mod slow_fs_tests {
use super::{fstype_for_path, path_starts_with, SLOW_MMAP_FSTYPES};
use std::path::Path;
const WSL_MOUNTINFO: &str = "\
75 80 0:29 / /usr/lib/modules/6.6.87 rw,nosuid,nodev,noatime - overlay none rw,lowerdir=/modules
80 65 8:48 / / rw,relatime - ext4 /dev/sdd rw,discard,errors=remount-ro
111 80 0:22 / /sys rw,nosuid,nodev,noexec,noatime shared:15 - sysfs sysfs rw
112 80 0:59 / /proc rw,nosuid,nodev,noexec,noatime shared:16 - proc proc rw
135 80 0:73 / /mnt/c rw,noatime - 9p C:\\134 rw,aname=drvfs;path=C:\\
136 80 0:74 / /mnt/d rw,noatime - 9p D:\\134 rw,aname=drvfs;path=D:\\
";
#[test]
fn wsl_mnt_c_detected_as_9p() {
let fs = fstype_for_path(
WSL_MOUNTINFO,
Path::new("/mnt/c/Projects/cqs/.cqs/index.db"),
);
assert_eq!(fs.as_deref(), Some("9p"));
assert!(SLOW_MMAP_FSTYPES.contains(&"9p"));
}
#[test]
fn wsl_home_detected_as_ext4_not_slow() {
let fs = fstype_for_path(
WSL_MOUNTINFO,
Path::new("/home/user001/project/.cqs/index.db"),
);
assert_eq!(fs.as_deref(), Some("ext4"));
assert!(!SLOW_MMAP_FSTYPES.contains(&"ext4"));
}
#[test]
fn wsl_mnt_c_root_itself_matches() {
let fs = fstype_for_path(WSL_MOUNTINFO, Path::new("/mnt/c"));
assert_eq!(fs.as_deref(), Some("9p"));
}
#[test]
fn sibling_mount_not_matched() {
let info = "135 80 0:73 / /mnt/c rw,noatime - 9p C:\\134 rw\n80 65 8:48 / / rw - ext4 /dev/sdd rw\n";
let fs = fstype_for_path(info, Path::new("/mnt/ca/file"));
assert_eq!(fs.as_deref(), Some("ext4"));
}
#[test]
fn longest_prefix_wins_for_nested_mounts() {
let info = "\
80 65 8:48 / / rw - ext4 /dev/sdd rw
135 80 0:73 / /mnt/c rw - 9p C:\\134 rw
200 135 0:99 / /mnt/c/WINDOWS rw - cifs //host/share rw
";
let fs = fstype_for_path(info, Path::new("/mnt/c/WINDOWS/system32/a.db"));
assert_eq!(fs.as_deref(), Some("cifs"));
}
#[test]
fn handles_optional_fields_with_shared_tag() {
let info = "\
76 80 0:32 / /some/mount rw,relatime shared:1 master:2 - 9p foo rw\n";
let fs = fstype_for_path(info, Path::new("/some/mount/file"));
assert_eq!(fs.as_deref(), Some("9p"));
}
#[test]
fn empty_or_garbage_mountinfo_returns_none() {
assert!(fstype_for_path("", Path::new("/any/path")).is_none());
assert!(fstype_for_path("not a mountinfo line\n", Path::new("/any/path")).is_none());
assert!(fstype_for_path("80 65 8:48 / / rw ext4 /dev/sdd rw\n", Path::new("/")).is_none());
}
#[test]
fn native_linux_ext4_is_not_slow() {
let info = "80 65 8:48 / / rw,relatime - ext4 /dev/sda1 rw\n";
let fs = fstype_for_path(info, Path::new("/home/alice/project/.cqs/index.db"));
assert_eq!(fs.as_deref(), Some("ext4"));
assert!(!SLOW_MMAP_FSTYPES.contains(&"ext4"));
}
#[test]
fn nfs_and_cifs_are_slow() {
let info = "\
80 65 8:48 / / rw - ext4 /dev/sdd rw
90 80 0:50 / /net/nfs rw - nfs server:/export rw
91 80 0:51 / /net/smb rw - cifs //host/share rw
";
assert_eq!(
fstype_for_path(info, Path::new("/net/nfs/index.db")).as_deref(),
Some("nfs")
);
assert_eq!(
fstype_for_path(info, Path::new("/net/smb/index.db")).as_deref(),
Some("cifs")
);
assert!(SLOW_MMAP_FSTYPES.contains(&"nfs"));
assert!(SLOW_MMAP_FSTYPES.contains(&"cifs"));
}
#[test]
fn path_starts_with_component_boundary() {
assert!(path_starts_with("/mnt/c/foo", "/mnt/c"));
assert!(path_starts_with("/mnt/c", "/mnt/c"));
assert!(!path_starts_with("/mnt/ca", "/mnt/c"));
assert!(!path_starts_with("/mnt", "/mnt/c"));
assert!(path_starts_with("/anything", "/"));
assert!(path_starts_with("/", "/"));
}
#[test]
fn resolve_mmap_size_env_override_wins() {
let prev = std::env::var("CQS_MMAP_SIZE").ok();
std::env::set_var("CQS_MMAP_SIZE", "12345");
let got = super::resolve_mmap_size("268435456", Path::new("/mnt/c/any"));
assert_eq!(got, "12345");
match prev {
Some(v) => std::env::set_var("CQS_MMAP_SIZE", v),
None => std::env::remove_var("CQS_MMAP_SIZE"),
}
}
}
fn cache_size_from_env(default_kib: &str) -> String {
std::env::var("CQS_SQLITE_CACHE_SIZE")
.ok()
.and_then(|v| v.trim().parse::<i64>().ok().map(|n| n.to_string()))
.unwrap_or_else(|| default_kib.to_string())
}
impl<Mode> Store<Mode> {
pub fn dim(&self) -> usize {
self.dim
}
pub fn set_dim(&mut self, dim: usize) {
self.dim = dim;
}
pub fn runtime(&self) -> &Arc<Runtime> {
&self.rt
}
}
impl Store<ReadWrite> {
pub fn open(path: &Path) -> Result<Self, StoreError> {
Self::open_with_config(path, Self::default_open_config(path, None))
}
pub fn open_with_runtime(path: &Path, runtime: Arc<Runtime>) -> Result<Self, StoreError> {
Self::open_with_config(path, Self::default_open_config(path, Some(runtime)))
}
fn default_open_config(path: &Path, runtime: Option<Arc<Runtime>>) -> StoreOpenConfig {
let max_connections = std::env::var("CQS_MAX_CONNECTIONS")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(4);
StoreOpenConfig {
read_only: false,
use_current_thread: false,
max_connections,
mmap_size: resolve_mmap_size("268435456", path), cache_size: cache_size_from_env("-16384"), runtime,
}
}
}
impl Store<ReadOnly> {
pub fn open_readonly_pooled(path: &Path) -> Result<Self, StoreError> {
open_with_config_impl::<ReadOnly>(
path,
StoreOpenConfig {
read_only: true,
use_current_thread: true,
max_connections: 1,
mmap_size: resolve_mmap_size("268435456", path),
cache_size: cache_size_from_env("-16384"),
runtime: None,
},
)
}
pub fn open_readonly(path: &Path) -> Result<Self, StoreError> {
open_with_config_impl::<ReadOnly>(
path,
StoreOpenConfig {
read_only: true,
use_current_thread: true,
max_connections: 1,
mmap_size: resolve_mmap_size("67108864", path), cache_size: cache_size_from_env("-4096"), runtime: None,
},
)
}
pub fn open_readonly_pooled_with_runtime(
path: &Path,
runtime: Arc<Runtime>,
) -> Result<Self, StoreError> {
open_with_config_impl::<ReadOnly>(
path,
StoreOpenConfig {
read_only: true,
use_current_thread: true,
max_connections: 1,
mmap_size: resolve_mmap_size("268435456", path),
cache_size: cache_size_from_env("-16384"),
runtime: Some(runtime),
},
)
}
pub fn open_readonly_small(path: &Path) -> Result<Self, StoreError> {
open_with_config_impl::<ReadOnly>(
path,
StoreOpenConfig {
read_only: true,
use_current_thread: true,
max_connections: 1,
mmap_size: resolve_mmap_size("16777216", path), cache_size: cache_size_from_env("-1024"), runtime: None,
},
)
}
pub fn open_readonly_after_init<F>(path: &Path, init: F) -> Result<Self, StoreError>
where
F: FnOnce(&Store<ReadWrite>) -> Result<(), StoreError>,
{
let rw = Store::<ReadWrite>::open(path)?;
init(&rw)?;
drop(rw);
Store::<ReadOnly>::open_readonly(path)
}
}
impl Store<ReadWrite> {
fn open_with_config(path: &Path, config: StoreOpenConfig) -> Result<Self, StoreError> {
open_with_config_impl::<ReadWrite>(path, config)
}
}
fn open_with_config_impl<Mode>(
path: &Path,
config: StoreOpenConfig,
) -> Result<Store<Mode>, StoreError> {
let mode = if config.read_only { "readonly" } else { "open" };
let _span = tracing::info_span!("store_open", %mode, path = %path.display()).entered();
let rt: Arc<Runtime> = if let Some(rt) = config.runtime {
rt
} else if config.use_current_thread {
Arc::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
)
} else {
Arc::new(
tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.max_connections as usize)
.enable_all()
.build()?,
)
};
let mut connect_opts = SqliteConnectOptions::new()
.filename(path)
.foreign_keys(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(helpers::sql::busy_timeout_from_env(5000))
.synchronous(SqliteSynchronous::Normal)
.pragma("mmap_size", config.mmap_size)
.log_slow_statements(log::LevelFilter::Warn, std::time::Duration::from_secs(5));
if config.read_only {
connect_opts = connect_opts.read_only(true);
} else {
connect_opts = connect_opts.create_if_missing(true);
}
let cache_pragma = format!("PRAGMA cache_size = {}", config.cache_size);
let pool = rt.block_on(async {
SqlitePoolOptions::new()
.max_connections(config.max_connections)
.idle_timeout(std::time::Duration::from_secs(
std::env::var("CQS_IDLE_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(30), ))
.after_connect(move |conn, _meta| {
let pragma = cache_pragma.clone();
Box::pin(async move {
sqlx::query(&pragma).execute(&mut *conn).await?;
sqlx::query("PRAGMA temp_store = MEMORY")
.execute(&mut *conn)
.await?;
Ok(())
})
})
.connect_with(connect_opts)
.await
})?;
#[cfg(unix)]
if !config.read_only {
use std::os::unix::fs::PermissionsExt;
let restrictive = std::fs::Permissions::from_mode(0o600);
if let Err(e) = std::fs::set_permissions(path, restrictive.clone()) {
tracing::debug!(path = %path.display(), error = %e, "Failed to set permissions");
}
let wal_path = path.with_extension("db-wal");
let shm_path = path.with_extension("db-shm");
if let Err(e) = std::fs::set_permissions(&wal_path, restrictive.clone()) {
tracing::debug!(path = %wal_path.display(), error = %e, "Failed to set permissions");
}
if let Err(e) = std::fs::set_permissions(&shm_path, restrictive) {
tracing::debug!(path = %shm_path.display(), error = %e, "Failed to set permissions");
}
}
tracing::info!(
path = %path.display(),
read_only = config.read_only,
"Database connected"
);
let opt_in = std::env::var("CQS_INTEGRITY_CHECK").as_deref() == Ok("1");
let force_skip = std::env::var("CQS_SKIP_INTEGRITY_CHECK").as_deref() == Ok("1");
let run_check = opt_in && !force_skip && !config.read_only;
if config.read_only {
tracing::debug!("Skipping integrity check (read-only open)");
} else if !run_check {
tracing::debug!("Integrity check skipped (set CQS_INTEGRITY_CHECK=1 to enable)");
}
if run_check {
rt.block_on(async {
let result: (String,) = sqlx::query_as("PRAGMA quick_check(1)")
.fetch_one(&pool)
.await?;
if result.0 != "ok" {
return Err(StoreError::Corruption(result.0));
}
Ok::<_, StoreError>(())
})?;
}
let dim = rt
.block_on(async {
let row: Option<(String,)> =
match sqlx::query_as("SELECT value FROM metadata WHERE key = 'dimensions'")
.fetch_optional(&pool)
.await
{
Ok(r) => r,
Err(sqlx::Error::Database(e)) if e.message().contains("no such table") => {
return Ok::<_, StoreError>(None);
}
Err(e) => return Err(e.into()),
};
Ok(match row {
Some((s,)) => match s.parse::<u32>() {
Ok(0) => {
tracing::warn!(raw = %s, "dimensions metadata is 0 — invalid, using default");
None
}
Ok(d) => Some(d as usize),
Err(e) => {
tracing::warn!(raw = %s, error = %e, "dimensions metadata is not a valid integer, using default");
None
}
},
None => None,
})
})?
.unwrap_or(crate::EMBEDDING_DIM);
let store: Store<Mode> = Store {
pool,
rt,
dim,
closed: AtomicBool::new(false),
notes_summaries_cache: RwLock::new(None),
note_boost_cache: RwLock::new(None),
call_graph_cache: std::sync::OnceLock::new(),
test_chunks_cache: std::sync::OnceLock::new(),
chunk_type_map_cache: std::sync::OnceLock::new(),
_mode: PhantomData,
};
store.check_schema_version(path)?;
store.check_cq_version();
Ok(store)
}
impl Store<ReadWrite> {
pub(crate) async fn begin_write(
&self,
) -> Result<
(
std::sync::MutexGuard<'static, ()>,
sqlx::Transaction<'_, sqlx::Sqlite>,
),
sqlx::Error,
> {
let _span = tracing::debug_span!("begin_write").entered();
let guard = WRITE_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let tx = self.pool.begin().await?;
Ok((guard, tx))
}
pub fn init(&self, model_info: &ModelInfo) -> Result<(), StoreError> {
let _span = tracing::info_span!("Store::init").entered();
self.rt.block_on(async {
let (_guard, mut tx) = self.begin_write().await?;
let schema = include_str!("../schema.sql");
for stmt in split_sql_statements(schema) {
if stmt.is_empty() {
continue;
}
sqlx::query(&stmt).execute(&mut *tx).await?;
}
let now = chrono::Utc::now().to_rfc3339();
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("schema_version")
.bind(CURRENT_SCHEMA_VERSION.to_string())
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("model_name")
.bind(&model_info.name)
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("dimensions")
.bind(model_info.dimensions.to_string())
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("created_at")
.bind(&now)
.execute(&mut *tx)
.await?;
sqlx::query("INSERT OR REPLACE INTO metadata (key, value) VALUES (?1, ?2)")
.bind("cq_version")
.bind(env!("CARGO_PKG_VERSION"))
.execute(&mut *tx)
.await?;
tx.commit().await?;
tracing::info!(
schema_version = CURRENT_SCHEMA_VERSION,
"Schema initialized"
);
Ok(())
})
}
}
impl<Mode> Store<Mode> {
pub fn clear_caches(&mut self) {
let _span = tracing::debug_span!("store_clear_caches").entered();
*self
.notes_summaries_cache
.write()
.unwrap_or_else(|e| e.into_inner()) = None;
*self
.note_boost_cache
.write()
.unwrap_or_else(|e| e.into_inner()) = None;
self.call_graph_cache = std::sync::OnceLock::new();
self.test_chunks_cache = std::sync::OnceLock::new();
self.chunk_type_map_cache = std::sync::OnceLock::new();
tracing::debug!("Store caches cleared");
}
pub fn close(self) -> Result<(), StoreError> {
self.closed.store(true, Ordering::Release);
self.rt.block_on(async {
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&self.pool)
.await?;
tracing::debug!("WAL checkpoint completed");
self.pool.close().await;
Ok(())
})
}
}
fn split_sql_statements(sql: &str) -> Vec<String> {
let mut statements: Vec<String> = Vec::new();
let mut current = String::new();
let mut in_trigger_body = false;
for raw_line in sql.lines() {
let trimmed = raw_line.trim();
if current.trim().is_empty() && (trimmed.is_empty() || trimmed.starts_with("--")) {
continue;
}
let upper = trimmed.to_ascii_uppercase();
if !in_trigger_body && (upper == "BEGIN" || upper.ends_with(" BEGIN")) {
in_trigger_body = true;
}
current.push_str(raw_line);
current.push('\n');
if in_trigger_body && (upper.starts_with("END;") || upper == "END" || upper == "END;") {
in_trigger_body = false;
statements.push(current.trim().trim_end_matches(';').trim().to_string());
current.clear();
continue;
}
if !in_trigger_body && trimmed.ends_with(';') {
let stmt = current.trim().trim_end_matches(';').trim().to_string();
if !stmt.is_empty() {
statements.push(stmt);
}
current.clear();
}
}
let tail = current.trim().to_string();
if !tail.is_empty() {
statements.push(tail);
}
statements
}
#[cfg(test)]
mod sql_split_tests {
use super::split_sql_statements;
#[test]
fn test_split_single_table() {
let sql = "CREATE TABLE foo (id INTEGER);";
let stmts = split_sql_statements(sql);
assert_eq!(stmts.len(), 1);
assert!(stmts[0].starts_with("CREATE TABLE foo"));
}
#[test]
fn test_split_multiple_statements() {
let sql = "CREATE TABLE foo (id INTEGER);\nCREATE INDEX idx_foo ON foo(id);";
let stmts = split_sql_statements(sql);
assert_eq!(stmts.len(), 2);
}
#[test]
fn test_split_trigger_body_preserved() {
let sql = "\
CREATE TABLE foo (id INTEGER);
CREATE TRIGGER bump_on_delete
AFTER DELETE ON foo
BEGIN
INSERT INTO bar (x) VALUES (1);
END;
CREATE TABLE baz (id INTEGER);
";
let stmts = split_sql_statements(sql);
assert_eq!(
stmts.len(),
3,
"foo + trigger + baz — trigger body must not be cut"
);
assert!(stmts[1].contains("CREATE TRIGGER"));
assert!(
stmts[1].contains("INSERT INTO bar"),
"trigger body must be preserved intact"
);
assert!(stmts[1].contains("END"), "trigger END must be included");
}
#[test]
fn test_split_skips_leading_comments() {
let sql = "-- comment\n-- another\nCREATE TABLE foo (id INTEGER);";
let stmts = split_sql_statements(sql);
assert_eq!(stmts.len(), 1);
assert!(stmts[0].starts_with("CREATE TABLE"));
}
#[test]
fn test_split_empty_input() {
assert!(split_sql_statements("").is_empty());
assert!(split_sql_statements("-- just a comment\n").is_empty());
assert!(split_sql_statements("\n\n\n").is_empty());
}
}
impl<Mode> Drop for Store<Mode> {
fn drop(&mut self) {
if self.closed.load(Ordering::Acquire) {
return; }
if let Err(payload) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
if let Err(e) = self.rt.block_on(async {
sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.execute(&self.pool)
.await
}) {
tracing::warn!(error = %e, "WAL checkpoint on drop failed (non-fatal)");
}
})) {
let msg = payload
.downcast_ref::<&str>()
.copied()
.or_else(|| payload.downcast_ref::<String>().map(|s| s.as_str()))
.unwrap_or("unknown panic");
tracing::warn!(
panic = msg,
"WAL checkpoint panic caught in Store::drop (non-fatal)"
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
use crate::nl::normalize_for_fts;
proptest! {
#[test]
fn fuzz_normalize_for_fts_no_panic(input in "\\PC{0,500}") {
let _ = normalize_for_fts(&input);
}
#[test]
fn fuzz_normalize_for_fts_safe_output(input in "\\PC{0,200}") {
let result = normalize_for_fts(&input);
for c in result.chars() {
prop_assert!(
c.is_alphanumeric() || c == ' ' || c == '_',
"Unexpected char '{}' (U+{:04X}) in output: {}",
c, c as u32, result
);
}
}
#[test]
fn fuzz_normalize_for_fts_special_chars(
prefix in "[a-z]{0,10}",
special in prop::sample::select(vec!['*', '"', ':', '^', '(', ')', '-', '+']),
suffix in "[a-z]{0,10}"
) {
let input = format!("{}{}{}", prefix, special, suffix);
let result = normalize_for_fts(&input);
prop_assert!(
!result.contains(special),
"Special char '{}' should be stripped from: {} -> {}",
special, input, result
);
}
#[test]
fn fuzz_normalize_for_fts_unicode(input in "[\\p{L}\\p{N}\\s]{0,100}") {
let result = normalize_for_fts(&input);
prop_assert!(result.len() <= input.len() * 4);
}
#[test]
fn prop_sanitize_no_special_chars(input in "\\PC{0,500}") {
let result = sanitize_fts_query(&input);
for c in result.chars() {
prop_assert!(
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
"FTS5 special char '{}' in sanitized output: {}",
c, result
);
}
}
#[test]
fn prop_sanitize_no_operators(input in "\\PC{0,300}") {
let result = sanitize_fts_query(&input);
for word in result.split_whitespace() {
prop_assert!(
!matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
"FTS5 operator '{}' survived sanitization: {}",
word, result
);
}
}
#[test]
fn prop_pipeline_safe(input in "\\PC{0,300}") {
let result = sanitize_fts_query(&normalize_for_fts(&input));
for c in result.chars() {
prop_assert!(
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
"Special char '{}' in pipeline output: {}",
c, result
);
}
for word in result.split_whitespace() {
prop_assert!(
!matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
"Operator '{}' in pipeline output: {}",
word, result
);
}
}
#[test]
fn prop_sanitize_all_special(
chars in prop::collection::vec(
prop::sample::select(vec!['"', '*', '(', ')', '+', '-', '^', ':']),
1..50
)
) {
let input: String = chars.into_iter().collect();
let result = sanitize_fts_query(&input);
prop_assert!(
result.is_empty(),
"All-special input should produce empty output, got: {}",
result
);
}
#[test]
fn prop_sanitize_operators_removed(
pre in "[a-z]{1,10}",
op in prop::sample::select(vec!["OR", "AND", "NOT", "NEAR"]),
post in "[a-z]{1,10}"
) {
let input = format!("{} {} {}", pre, op, post);
let result = sanitize_fts_query(&input);
prop_assert!(
!result.split_whitespace().any(|w| w == op),
"Operator '{}' not stripped from: {} -> {}",
op, input, result
);
prop_assert!(result.contains(&pre), "Pre-text '{}' missing from: {}", pre, result);
prop_assert!(result.contains(&post), "Post-text '{}' missing from: {}", post, result);
}
#[test]
fn prop_sanitize_adversarial(
normal in "[a-z]{1,10}",
special in prop::sample::select(vec!['"', '*', '(', ')', '+', '-', '^', ':']),
op in prop::sample::select(vec!["OR", "AND", "NOT", "NEAR"]),
) {
let input = format!("{}{} {} {}{}", special, normal, op, normal, special);
let result = sanitize_fts_query(&input);
for c in result.chars() {
prop_assert!(
!matches!(c, '"' | '*' | '(' | ')' | '+' | '-' | '^' | ':'),
"Special char '{}' in adversarial output: {}",
c, result
);
}
for word in result.split_whitespace() {
prop_assert!(
!matches!(word, "OR" | "AND" | "NOT" | "NEAR"),
"Operator '{}' in adversarial output: {}",
word, result
);
}
}
}
fn make_test_store_initialized() -> (Store, tempfile::TempDir) {
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join(crate::INDEX_DB_FILENAME);
let store = Store::open(&db_path).unwrap();
store.init(&ModelInfo::default()).unwrap();
(store, dir)
}
#[test]
fn concurrent_readonly_opens() {
let (_writer, dir) = make_test_store_initialized();
let db_path = dir.path().join(crate::INDEX_DB_FILENAME);
let ro1 = Store::open_readonly(&db_path).expect("first readonly open failed");
let ro2 = Store::open_readonly(&db_path).expect("second readonly open failed");
assert!(ro1.check_model_version().is_ok());
assert!(ro2.check_model_version().is_ok());
}
#[test]
fn readonly_open_while_writer_holds() {
let (writer, dir) = make_test_store_initialized();
let db_path = dir.path().join(crate::INDEX_DB_FILENAME);
let ro = Store::open_readonly(&db_path).expect("readonly open failed while writer active");
assert!(ro.check_model_version().is_ok());
drop(writer);
}
#[test]
fn onclock_cache_not_invalidated_by_writes() {
let (store, _dir) = make_test_store_initialized();
let graph_before = store.get_call_graph().expect("first get_call_graph failed");
let callers_before = graph_before.forward.len();
store
.upsert_function_calls(
std::path::Path::new("test.rs"),
&[crate::parser::FunctionCalls {
name: "caller".to_string(),
line_start: 1,
calls: vec![crate::parser::CallSite {
callee_name: "callee".to_string(),
line_number: 2,
}],
}],
)
.unwrap();
let graph_after = store
.get_call_graph()
.expect("second get_call_graph failed");
assert_eq!(
graph_after.forward.len(),
callers_before,
"OnceLock cache should not be invalidated by writes within the same Store lifetime"
);
}
#[test]
fn double_init_is_idempotent() {
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join(crate::INDEX_DB_FILENAME);
let store = Store::open(&db_path).unwrap();
store
.init(&ModelInfo::default())
.expect("first init() failed");
store
.init(&ModelInfo::default())
.expect("second init() should be idempotent but failed");
}
#[test]
fn open_readonly_small_roundtrip() {
use crate::embedder::Embedding;
use crate::parser::{Chunk, ChunkType, Language};
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("index.db");
{
let store = Store::open(&db_path).unwrap();
store.init(&ModelInfo::default()).unwrap();
let content = "fn small() {}";
let hash = blake3::hash(content.as_bytes()).to_hex().to_string();
let chunk = Chunk {
id: format!("small.rs:1:{}", &hash[..8]),
file: std::path::PathBuf::from("small.rs"),
language: Language::Rust,
chunk_type: ChunkType::Function,
name: "small".to_string(),
signature: "fn small()".to_string(),
content: content.to_string(),
doc: None,
line_start: 1,
line_end: 1,
content_hash: hash,
parent_id: None,
window_idx: None,
parent_type_name: None,
};
let mut v = vec![0.0f32; store.dim()];
if !v.is_empty() {
v[0] = 1.0;
}
let embedding = Embedding::new(v);
store.upsert_chunk(&chunk, &embedding, Some(100)).unwrap();
}
let ro = Store::open_readonly_small(&db_path)
.expect("open_readonly_small should succeed on a populated index");
ro.check_model_version()
.expect("schema/model check should pass on small-mode open");
let stats = ro.stats().expect("stats() should work on small-mode store");
assert_eq!(
stats.total_chunks, 1,
"small-mode store must see the fixture chunk"
);
let chunks = ro
.get_chunks_by_origin("small.rs")
.expect("get_chunks_by_origin should work on small-mode store");
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].name, "small");
}
#[test]
fn open_readonly_after_init_happy_path() {
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("index.db");
let store = Store::<ReadOnly>::open_readonly_after_init(&db_path, |s| {
s.init(&ModelInfo::default())?;
s.rt.block_on(async {
sqlx::query(
"INSERT INTO function_calls (file, caller_name, callee_name, caller_line, call_line)
VALUES ('test.rs', 'caller_fn', 'callee_fn', 1, 2)",
)
.execute(&s.pool)
.await
})?;
Ok(())
})
.expect("open_readonly_after_init happy path failed");
let count: i64 = store
.rt
.block_on(async {
sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM function_calls WHERE caller_name = 'caller_fn'",
)
.fetch_one(&store.pool)
.await
})
.expect("query after reopen failed");
assert_eq!(count, 1, "RO handle did not see the fixture write");
}
#[test]
fn open_readonly_after_init_closure_error_propagates() {
let dir = tempfile::TempDir::new().unwrap();
let db_path = dir.path().join("index.db");
match Store::<ReadOnly>::open_readonly_after_init(&db_path, |_s| {
Err(StoreError::Runtime("closure rejected the init".to_string()))
}) {
Ok(_) => panic!("constructor should have surfaced closure error"),
Err(StoreError::Runtime(msg)) => assert_eq!(msg, "closure rejected the init"),
Err(other) => panic!("expected Runtime error, got {other:?}"),
}
let recovered = Store::<ReadOnly>::open_readonly_after_init(&db_path, |s| {
s.init(&ModelInfo::default())?;
Ok(())
})
.expect("second attempt after closure error should succeed");
assert!(recovered.check_model_version().is_ok());
}
}