use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::time::{Duration, SystemTime};
use anyhow::{bail, Context, Result};
use notify::{Config, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher};
use tracing::{info, info_span, warn};
use cqs::embedder::{Embedder, Embedding, ModelConfig};
use cqs::generate_nl_description;
use cqs::hnsw::HnswIndex;
use cqs::note::parse_notes;
use cqs::parser::{ChunkTypeRefs, Parser as CqParser};
use cqs::store::Store;
use super::{check_interrupted, find_project_root, try_acquire_index_lock, Cli};
#[cfg(unix)]
struct SocketCleanupGuard(PathBuf);
#[cfg(unix)]
impl Drop for SocketCleanupGuard {
fn drop(&mut self) {
if self.0.exists() {
if let Err(e) = std::fs::remove_file(&self.0) {
tracing::warn!(path = %self.0.display(), error = %e, "Failed to remove socket file");
} else {
tracing::info!(path = %self.0.display(), "Daemon socket removed");
}
}
}
}
#[cfg(unix)]
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
#[cfg(unix)]
fn is_shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::Acquire)
}
#[cfg(unix)]
fn daemon_should_exit() -> bool {
is_shutdown_requested() || check_interrupted()
}
#[cfg(unix)]
extern "C" fn on_sigterm(_sig: libc::c_int) {
SHUTDOWN_REQUESTED.store(true, Ordering::Release);
}
#[cfg(unix)]
const MAX_CONCURRENT_DAEMON_CLIENTS: usize = 64;
fn build_shared_runtime() -> std::io::Result<Arc<tokio::runtime::Runtime>> {
let worker_threads = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1)
.min(4);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(worker_threads)
.enable_all()
.thread_name("cqs-shared-rt")
.build()?;
tracing::debug!(
worker_threads,
"Built shared tokio runtime for Store/EmbeddingCache/QueryCache"
);
Ok(Arc::new(rt))
}
#[cfg(unix)]
fn install_sigterm_handler() {
unsafe {
let prev = libc::signal(libc::SIGTERM, on_sigterm as *const () as libc::sighandler_t);
if prev == libc::SIG_ERR {
let e = std::io::Error::last_os_error();
tracing::warn!(error = %e, "Failed to install SIGTERM handler; watch will rely on SIGINT only");
} else {
tracing::debug!("SIGTERM handler installed for clean daemon shutdown");
}
}
}
#[cfg(unix)]
fn handle_socket_client(
mut stream: std::os::unix::net::UnixStream,
batch_ctx: &Mutex<super::batch::BatchContext>,
) {
let span = tracing::info_span!("daemon_query", command = tracing::field::Empty);
let _enter = span.enter();
let start = std::time::Instant::now();
if let Err(e) = stream.set_read_timeout(Some(Duration::from_secs(5))) {
tracing::warn!(
error = %e,
"Failed to set read timeout on daemon stream — slow client could pin handler"
);
}
if let Err(e) = stream.set_write_timeout(Some(Duration::from_secs(30))) {
tracing::warn!(
error = %e,
"Failed to set write timeout on daemon stream — slow client could pin handler"
);
}
use std::io::Read as _;
let mut reader = std::io::BufReader::new(&stream).take(1_048_577);
let mut line = String::new();
match std::io::BufRead::read_line(&mut reader, &mut line) {
Ok(0) => return,
Ok(n) if n > 1_048_576 => {
let delivered = write_daemon_error_tracked(&mut stream, "request too large");
tracing::info!(
status = "client_error",
delivered,
latency_ms = start.elapsed().as_millis() as u64,
"Daemon query complete"
);
return;
}
Err(e) => {
tracing::debug!(error = %e, "Socket read failed");
return;
}
Ok(_) => {}
}
let request: serde_json::Value = match serde_json::from_str(line.trim()) {
Ok(v) => v,
Err(e) => {
let delivered = write_daemon_error_tracked(&mut stream, &format!("invalid JSON: {e}"));
tracing::info!(
status = "parse_error",
delivered,
latency_ms = start.elapsed().as_millis() as u64,
"Daemon query complete"
);
return;
}
};
let command = request
.get("command")
.and_then(|v| v.as_str())
.unwrap_or("");
let args: Vec<String> = request
.get("args")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let command_for_log: String = if command == "notes" {
let sub = args.first().map(String::as_str).unwrap_or("<unknown>");
match sub {
"add" | "update" | "remove" | "list" => format!("notes/{sub}"),
_ => "notes/<unknown>".to_string(),
}
} else {
command.to_string()
};
span.record("command", command_for_log.as_str());
let args_preview: String = if command == "notes" {
"<redacted>".to_string()
} else {
let joined = args.join(" ");
let end = joined
.char_indices()
.nth(80)
.map(|(i, _)| i)
.unwrap_or(joined.len());
joined[..end].to_string()
};
tracing::debug!(
command = %command_for_log,
args_len = args.len(),
args_preview = %args_preview,
"Daemon request"
);
if command.is_empty() {
let delivered = write_daemon_error_tracked(&mut stream, "missing 'command' field");
tracing::info!(
status = "client_error",
delivered,
latency_ms = start.elapsed().as_millis() as u64,
"Daemon query complete"
);
return;
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let full_line = if args.is_empty() {
command.to_string()
} else {
format!("{} {}", command, shell_words::join(&args))
};
let mut output = Vec::new();
{
let ctx = batch_ctx
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
ctx.dispatch_line(&full_line, &mut output);
}
String::from_utf8(output).map_err(|e| format!("non-UTF-8 output: {e}"))
}));
let (status, delivered) = match result {
Ok(Ok(output)) => {
let resp = serde_json::json!({
"status": "ok",
"output": output.trim_end(),
});
let delivered = match writeln!(stream, "{}", resp) {
Ok(()) => true,
Err(e) => {
tracing::debug!(error = %e, "Failed to write daemon response");
false
}
};
("ok", delivered)
}
Ok(Err(e)) => {
let delivered = write_daemon_error_tracked(&mut stream, &e);
("client_error", delivered)
}
Err(payload) => {
let msg = payload
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| payload.downcast_ref::<&'static str>().copied())
.unwrap_or("<non-string panic payload>");
let delivered =
write_daemon_error_tracked(&mut stream, "internal error (panic in dispatch)");
tracing::error!(
panic_msg = %msg,
"Daemon query panicked — daemon continues"
);
("panic", delivered)
}
};
tracing::info!(
status,
delivered,
latency_ms = start.elapsed().as_millis() as u64,
"Daemon query complete"
);
}
#[cfg(unix)]
fn write_daemon_error(
stream: &mut std::os::unix::net::UnixStream,
message: &str,
) -> std::io::Result<()> {
use std::io::Write;
let resp = serde_json::json!({ "status": "error", "message": message });
writeln!(stream, "{}", resp)
}
#[cfg(unix)]
fn write_daemon_error_tracked(stream: &mut std::os::unix::net::UnixStream, message: &str) -> bool {
match write_daemon_error(stream, message) {
Ok(()) => true,
Err(e) => {
tracing::debug!(error = %e, "Failed to write daemon error response");
false
}
}
}
#[cfg(unix)]
fn db_file_identity(path: &Path) -> Option<(u64, u64)> {
use std::os::unix::fs::MetadataExt;
let meta = std::fs::metadata(path).ok()?;
Some((meta.dev(), meta.ino()))
}
#[cfg(not(unix))]
fn db_file_identity(path: &Path) -> Option<SystemTime> {
std::fs::metadata(path).ok()?.modified().ok()
}
fn hnsw_rebuild_threshold() -> usize {
static CACHE: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*CACHE.get_or_init(|| {
std::env::var("CQS_WATCH_REBUILD_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(100)
})
}
fn max_pending_files() -> usize {
static CACHE: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*CACHE.get_or_init(|| {
std::env::var("CQS_WATCH_MAX_PENDING")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(10_000)
})
}
struct WatchConfig<'a> {
root: &'a Path,
cqs_dir: &'a Path,
notes_path: &'a Path,
supported_ext: &'a HashSet<&'a str>,
parser: &'a CqParser,
embedder: &'a std::sync::OnceLock<std::sync::Arc<Embedder>>,
quiet: bool,
model_config: &'a ModelConfig,
gitignore: &'a std::sync::RwLock<Option<ignore::gitignore::Gitignore>>,
splade_encoder: Option<&'a std::sync::Mutex<cqs::splade::SpladeEncoder>>,
}
struct WatchState {
embedder_backoff: EmbedderBackoff,
pending_files: HashSet<PathBuf>,
pending_notes: bool,
last_event: std::time::Instant,
last_indexed_mtime: HashMap<PathBuf, SystemTime>,
hnsw_index: Option<HnswIndex>,
incremental_count: usize,
dropped_this_cycle: usize,
}
struct EmbedderBackoff {
failures: u32,
next_retry: std::time::Instant,
}
impl EmbedderBackoff {
fn new() -> Self {
Self {
failures: 0,
next_retry: std::time::Instant::now(),
}
}
fn record_failure(&mut self) {
self.failures = self.failures.saturating_add(1);
let delay_secs = 2u64.saturating_pow(self.failures).min(300);
self.next_retry = std::time::Instant::now() + Duration::from_secs(delay_secs);
warn!(
failures = self.failures,
next_retry_secs = delay_secs,
"Embedder init failed, backing off"
);
}
fn reset(&mut self) {
self.failures = 0;
self.next_retry = std::time::Instant::now();
}
fn should_retry(&self) -> bool {
std::time::Instant::now() >= self.next_retry
}
}
fn try_init_embedder<'a>(
embedder: &'a std::sync::OnceLock<std::sync::Arc<Embedder>>,
backoff: &mut EmbedderBackoff,
model_config: &ModelConfig,
) -> Option<&'a Embedder> {
match embedder.get() {
Some(e) => Some(e.as_ref()),
None => {
if !backoff.should_retry() {
return None;
}
match Embedder::new(model_config.clone()) {
Ok(e) => {
backoff.reset();
Some(embedder.get_or_init(|| std::sync::Arc::new(e)).as_ref())
}
Err(e) => {
warn!(error = %e, "Failed to initialize embedder");
backoff.record_failure();
None
}
}
}
}
}
fn is_under_wsl_automount(path: &str) -> bool {
static AUTOMOUNT_ROOT: std::sync::OnceLock<String> = std::sync::OnceLock::new();
let root = AUTOMOUNT_ROOT
.get_or_init(|| parse_wsl_automount_root().unwrap_or_else(|| "/mnt/".to_string()));
path.starts_with(root.as_str())
}
fn parse_wsl_automount_root() -> Option<String> {
let content = std::fs::read_to_string("/etc/wsl.conf").ok()?;
let mut in_automount = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.starts_with('[') {
in_automount = trimmed
.trim_start_matches('[')
.trim_end_matches(']')
.trim()
.eq_ignore_ascii_case("automount");
continue;
}
if in_automount {
if let Some((key, value)) = trimmed.split_once('=') {
if key.trim().eq_ignore_ascii_case("root") {
let mut root = value.trim().to_string();
if !root.ends_with('/') {
root.push('/');
}
return Some(root);
}
}
}
}
None
}
fn build_gitignore_matcher(root: &Path) -> Option<ignore::gitignore::Gitignore> {
let _span = tracing::info_span!("build_gitignore_matcher").entered();
if std::env::var("CQS_WATCH_RESPECT_GITIGNORE").as_deref() == Ok("0") {
tracing::info!("CQS_WATCH_RESPECT_GITIGNORE=0 — gitignore filtering disabled");
return None;
}
let root_gitignore = root.join(".gitignore");
if !root_gitignore.exists() {
tracing::info!(
root = %root.display(),
"no .gitignore at project root — watch will not filter by gitignore"
);
return None;
}
let mut builder = ignore::gitignore::GitignoreBuilder::new(root);
if let Some(err) = builder.add(&root_gitignore) {
tracing::warn!(
path = %root_gitignore.display(),
error = %err,
"root .gitignore unreadable or malformed — falling back to empty matcher"
);
return None;
}
match builder.build() {
Ok(gi) => {
tracing::info!(
n_files = gi.num_ignores(),
"gitignore matcher loaded for watch loop"
);
Some(gi)
}
Err(err) => {
tracing::warn!(
error = %err,
"gitignore matcher build failed — watch will not filter by gitignore"
);
None
}
}
}
fn build_splade_encoder_for_watch() -> Option<cqs::splade::SpladeEncoder> {
let _span = tracing::info_span!("build_splade_encoder_for_watch").entered();
if std::env::var("CQS_WATCH_INCREMENTAL_SPLADE").as_deref() == Ok("0") {
tracing::info!(
"CQS_WATCH_INCREMENTAL_SPLADE=0 — daemon runs dense-only, \
sparse coverage will drift until manual 'cqs index'"
);
return None;
}
let dir = match cqs::splade::resolve_splade_model_dir() {
Some(d) => d,
None => {
tracing::info!("No SPLADE model configured — incremental SPLADE disabled");
return None;
}
};
match cqs::splade::SpladeEncoder::new(&dir, 0.01) {
Ok(enc) => {
tracing::info!(
model_dir = %dir.display(),
"SPLADE encoder loaded for incremental encoding"
);
Some(enc)
}
Err(e) => {
tracing::warn!(
model_dir = %dir.display(),
error = %e,
"SPLADE encoder load failed — existing sparse_vectors untouched, \
coverage will drift until manual 'cqs index'"
);
None
}
}
}
fn encode_splade_for_changed_files(
encoder_mu: &std::sync::Mutex<cqs::splade::SpladeEncoder>,
store: &Store,
changed_files: &[PathBuf],
) {
let batch_size = splade_batch_size();
let _span = tracing::info_span!(
"encode_splade_for_changed_files",
n_files = changed_files.len(),
batch_size
)
.entered();
let mut batch: Vec<(String, String)> = Vec::new();
for file in changed_files {
let origin = file.display().to_string();
let chunks = match store.get_chunks_by_origin(&origin) {
Ok(v) => v,
Err(e) => {
tracing::warn!(
origin = %origin,
error = %e,
"SPLADE encode: failed to fetch chunks for file — skipping"
);
continue;
}
};
for chunk in chunks {
batch.push((chunk.id, chunk.content));
}
}
if batch.is_empty() {
tracing::debug!("SPLADE encode: no chunks to encode, nothing to do");
return;
}
let mut encoded: Vec<(String, cqs::splade::SparseVector)> = Vec::with_capacity(batch.len());
let encoder = match encoder_mu.lock() {
Ok(e) => e,
Err(poisoned) => {
tracing::warn!("SPLADE encoder mutex poisoned — recovering");
poisoned.into_inner()
}
};
for sub in batch.chunks(batch_size) {
let texts: Vec<&str> = sub.iter().map(|(_, t)| t.as_str()).collect();
match encoder.encode_batch(&texts) {
Ok(sparse_batch) => {
for ((chunk_id, _), sparse) in sub.iter().zip(sparse_batch.into_iter()) {
encoded.push((chunk_id.clone(), sparse));
}
tracing::debug!(batch_size = sub.len(), "SPLADE batch encoded");
}
Err(e) => {
tracing::warn!(
batch_size = sub.len(),
error = %e,
"SPLADE batch encode failed — skipping batch"
);
}
}
}
drop(encoder);
if encoded.is_empty() {
return;
}
match store.upsert_sparse_vectors(&encoded) {
Ok(inserted) => tracing::info!(
chunks_encoded = encoded.len(),
rows_inserted = inserted,
"SPLADE incremental encode complete"
),
Err(e) => tracing::warn!(
error = %e,
"SPLADE upsert failed — sparse_vectors not updated for this cycle"
),
}
}
fn splade_batch_size() -> usize {
std::env::var("CQS_SPLADE_BATCH")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.filter(|n| *n > 0)
.unwrap_or(32)
}
pub fn cmd_watch(
cli: &Cli,
debounce_ms: u64,
no_ignore: bool,
poll: bool,
serve: bool,
) -> Result<()> {
let _span = tracing::info_span!("cmd_watch", debounce_ms, poll, serve, no_ignore).entered();
#[cfg(unix)]
install_sigterm_handler();
let root = find_project_root();
let use_poll = poll
|| (cqs::config::is_wsl()
&& root
.to_str()
.is_some_and(|p| p.starts_with("//wsl") || is_under_wsl_automount(p)));
if cqs::config::is_wsl() && !use_poll {
tracing::warn!("WSL detected: inotify may be unreliable on Windows filesystem mounts. Use --poll or 'cqs index' periodically.");
}
let debounce_ms = if let Some(env_ms) = std::env::var("CQS_WATCH_DEBOUNCE_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
{
env_ms
} else if debounce_ms == 500 && use_poll {
tracing::info!(
"Auto-bumping watch debounce to 1500ms for WSL/poll mode (override via --debounce or CQS_WATCH_DEBOUNCE_MS)"
);
1500
} else {
debounce_ms
};
let cqs_dir = cqs::resolve_index_dir(&root);
let index_path = cqs_dir.join(cqs::INDEX_DB_FILENAME);
if !index_path.exists() {
bail!("No index found. Run 'cqs index' first.");
}
#[cfg(unix)]
let mut socket_listener = if serve {
let sock_path = super::daemon_socket_path(&cqs_dir);
if sock_path.exists() {
match std::os::unix::net::UnixStream::connect(&sock_path) {
Ok(_) => {
anyhow::bail!(
"Another daemon is already listening on {}",
sock_path.display()
);
}
Err(_) => {
use std::os::unix::fs::FileTypeExt;
match std::fs::symlink_metadata(&sock_path) {
Ok(md) => {
let ft = md.file_type();
if ft.is_symlink() || ft.is_dir() {
anyhow::bail!(
"Refusing to remove non-socket path {} (symlink/dir); resolve manually before starting daemon",
sock_path.display()
);
}
if !(ft.is_socket() || ft.is_file()) {
anyhow::bail!(
"Refusing to remove non-socket path {} (unexpected file type); resolve manually before starting daemon",
sock_path.display()
);
}
if let Err(e) = std::fs::remove_file(&sock_path) {
tracing::warn!(
error = %e,
path = %sock_path.display(),
"Failed to remove stale socket file"
);
} else {
tracing::debug!(path = %sock_path.display(), "Removed stale socket file");
}
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
}
Err(e) => {
tracing::warn!(
error = %e,
path = %sock_path.display(),
"Failed to stat socket path before cleanup"
);
}
}
}
}
}
let listener = std::os::unix::net::UnixListener::bind(&sock_path)
.with_context(|| format!("Failed to bind socket at {}", sock_path.display()))?;
listener.set_nonblocking(true)?;
{
use std::os::unix::fs::PermissionsExt;
if let Err(e) =
std::fs::set_permissions(&sock_path, std::fs::Permissions::from_mode(0o600))
{
tracing::warn!(
error = %e,
path = %sock_path.display(),
"Failed to set socket permissions to 0o600"
);
}
}
tracing::info!(
socket = %sock_path.display(),
pid = std::process::id(),
"Daemon listening"
);
if !cli.quiet {
println!("Daemon listening on {}", sock_path.display());
}
let cqs_vars: Vec<(String, String)> = std::env::vars()
.filter(|(k, _)| k.starts_with("CQS_"))
.collect();
tracing::info!(cqs_vars = ?cqs_vars, "Daemon env snapshot");
Some((listener, sock_path))
} else {
None
};
#[cfg(unix)]
let _socket_guard = socket_listener
.as_ref()
.map(|(_, path)| SocketCleanupGuard(path.clone()));
#[cfg(not(unix))]
if serve {
eprintln!(
"Warning: --serve is unix-only (daemon socket uses Unix domain sockets); \
falling back to plain watch mode"
);
tracing::warn!("--serve requested on non-unix platform; daemon disabled");
}
let shared_embedder: std::sync::Arc<std::sync::OnceLock<std::sync::Arc<Embedder>>> =
std::sync::Arc::new(std::sync::OnceLock::new());
let shared_rt = build_shared_runtime()
.with_context(|| "Failed to build shared tokio runtime for daemon")?;
#[cfg(unix)]
let mut socket_thread: Option<std::thread::JoinHandle<()>> = if serve {
if let Some((listener, _)) = socket_listener.take() {
let daemon_embedder = std::sync::Arc::clone(&shared_embedder);
let daemon_model_config = cli.try_model_config()?.clone();
let daemon_runtime = Arc::clone(&shared_rt);
let thread = std::thread::spawn(move || {
let ctx = match super::batch::create_context_with_runtime(Some(daemon_runtime)) {
Ok(ctx) => {
if let Some(existing) = daemon_embedder.get() {
ctx.adopt_embedder(std::sync::Arc::clone(existing));
tracing::info!("Daemon adopted shared embedder");
} else {
match Embedder::new(daemon_model_config) {
Ok(emb) => {
let arc = std::sync::Arc::new(emb);
let winning_arc =
daemon_embedder.get_or_init(|| std::sync::Arc::clone(&arc));
ctx.adopt_embedder(std::sync::Arc::clone(winning_arc));
tracing::info!("Daemon built and shared embedder");
}
Err(e) => {
tracing::warn!(error = %e, "Daemon embedder init failed — will retry lazily");
}
}
}
ctx.warm();
ctx
}
Err(e) => {
tracing::error!(error = %e, "Daemon BatchContext creation failed");
return;
}
};
let ctx = Arc::new(Mutex::new(ctx));
let in_flight = Arc::new(AtomicUsize::new(0));
tracing::info!(
max_concurrent = MAX_CONCURRENT_DAEMON_CLIENTS,
"Daemon query thread ready"
);
let mut last_idle_sweep = std::time::Instant::now();
let idle_sweep_interval = Duration::from_secs(60);
loop {
if daemon_should_exit() {
tracing::info!("Daemon accept loop draining on shutdown signal");
break;
}
if last_idle_sweep.elapsed() >= idle_sweep_interval {
if let Ok(ctx_guard) = ctx.try_lock() {
ctx_guard.sweep_idle_sessions();
}
last_idle_sweep = std::time::Instant::now();
}
match listener.accept() {
Ok((stream, _addr)) => {
let current = in_flight.load(Ordering::Acquire);
if current >= MAX_CONCURRENT_DAEMON_CLIENTS {
let mut s = stream;
let _ = write_daemon_error(
&mut s,
"daemon busy (too many concurrent clients)",
);
tracing::warn!(
in_flight = current,
cap = MAX_CONCURRENT_DAEMON_CLIENTS,
"Rejecting new daemon connection — at concurrency cap"
);
continue;
}
in_flight.fetch_add(1, Ordering::AcqRel);
let ctx_clone = Arc::clone(&ctx);
let in_flight_clone = Arc::clone(&in_flight);
if let Err(e) = std::thread::Builder::new()
.name("cqs-daemon-client".to_string())
.spawn(move || {
handle_socket_client(stream, &ctx_clone);
in_flight_clone.fetch_sub(1, Ordering::AcqRel);
})
{
in_flight.fetch_sub(1, Ordering::AcqRel);
tracing::warn!(
error = %e,
"Failed to spawn daemon client thread — dropping connection"
);
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(100));
}
Err(e) => {
tracing::warn!(error = %e, "Socket accept failed");
}
}
}
});
Some(thread)
} else {
None
}
} else {
None
};
let parser = CqParser::new()?;
let supported_ext: HashSet<_> = parser.supported_extensions().iter().cloned().collect();
println!(
"Watching {} for changes (Ctrl+C to stop)...",
root.display()
);
println!(
"Code extensions: {}",
supported_ext.iter().cloned().collect::<Vec<_>>().join(", ")
);
println!("Also watching: docs/notes.toml");
if cqs::splade::resolve_splade_model_dir().is_some() {
println!(
"⚠ SPLADE model configured but watch mode does not refresh sparse vectors for \
newly-added chunks. Run 'cqs index' after a stable edit session to restore \
full SPLADE coverage. Sparse correctness for removed chunks is maintained \
automatically via the v20 schema trigger."
);
tracing::warn!(
"Watch mode does not re-run SPLADE encoding — new chunks will have no sparse \
vectors until manual 'cqs index'. Removals are handled via the v20 chunks-delete \
trigger."
);
}
let (tx, rx) = mpsc::channel();
let config = Config::default().with_poll_interval(Duration::from_millis(debounce_ms));
let mut watcher: Box<dyn Watcher> = if use_poll {
println!("Using poll watcher (interval: {}ms)", debounce_ms);
Box::new(PollWatcher::new(tx, config)?)
} else {
Box::new(RecommendedWatcher::new(tx, config)?)
};
watcher.watch(&root, RecursiveMode::Recursive)?;
let debounce = Duration::from_millis(debounce_ms);
let notes_path = root.join("docs/notes.toml");
let cqs_dir = dunce::canonicalize(&cqs_dir).unwrap_or_else(|e| {
tracing::debug!(path = %cqs_dir.display(), error = %e, "canonicalize failed, using original");
cqs_dir
});
let notes_path = dunce::canonicalize(¬es_path).unwrap_or_else(|e| {
tracing::debug!(path = %notes_path.display(), error = %e, "canonicalize failed, using original");
notes_path
});
let mut store = Store::open_with_runtime(&index_path, Arc::clone(&shared_rt))
.with_context(|| format!("Failed to open store at {}", index_path.display()))?;
let mut db_id = db_file_identity(&index_path);
let (hnsw_index, incremental_count) =
match HnswIndex::load_with_dim(cqs_dir.as_ref(), "index", store.dim()) {
Ok(index) => {
info!(vectors = index.len(), "Loaded existing HNSW index");
(Some(index), hnsw_rebuild_threshold() / 2)
}
Err(ref e) if matches!(e, cqs::hnsw::HnswError::NotFound(_)) => {
tracing::debug!("No prior HNSW index, starting fresh");
(None, 0)
}
Err(e) => {
tracing::warn!(error = %e, "Existing HNSW index unusable, rebuilding from scratch");
(None, 0)
}
};
let model_config = cli.try_model_config()?;
let gitignore = std::sync::RwLock::new(if no_ignore {
tracing::info!("--no-ignore passed — gitignore filtering disabled");
None
} else {
build_gitignore_matcher(&root)
});
let splade_encoder_storage = build_splade_encoder_for_watch().map(std::sync::Mutex::new);
let splade_encoder_ref: Option<&std::sync::Mutex<cqs::splade::SpladeEncoder>> =
splade_encoder_storage.as_ref();
let watch_cfg = WatchConfig {
root: &root,
cqs_dir: &cqs_dir,
notes_path: ¬es_path,
supported_ext: &supported_ext,
parser: &parser,
embedder: shared_embedder.as_ref(),
quiet: cli.quiet,
model_config,
gitignore: &gitignore,
splade_encoder: splade_encoder_ref,
};
let mut state = WatchState {
embedder_backoff: EmbedderBackoff::new(),
pending_files: HashSet::new(),
pending_notes: false,
last_event: std::time::Instant::now(),
last_indexed_mtime: HashMap::with_capacity(1024),
hnsw_index,
incremental_count,
dropped_this_cycle: 0,
};
let mut cycles_since_clear: u32 = 0;
let mut last_cache_evict = std::time::Instant::now();
loop {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok(event)) => {
collect_events(&event, &watch_cfg, &mut state);
}
Ok(Err(e)) => {
warn!(error = %e, "Watch error");
}
Err(mpsc::RecvTimeoutError::Timeout) => {
let should_process = (!state.pending_files.is_empty() || state.pending_notes)
&& state.last_event.elapsed() >= debounce;
if should_process {
cycles_since_clear = 0;
let lock = match try_acquire_index_lock(&cqs_dir) {
Ok(Some(lock)) => lock,
Ok(None) => {
info!("Index lock held by another process, skipping reindex cycle");
continue;
}
Err(e) => {
warn!(error = %e, "Failed to create index lock file");
continue;
}
};
let current_id = db_file_identity(&index_path);
if current_id != db_id {
info!("index.db replaced (likely cqs index --force), reopening store");
drop(store);
store = Store::open_with_runtime(&index_path, Arc::clone(&shared_rt))
.with_context(|| {
format!(
"Failed to re-open store at {} after DB replacement",
index_path.display()
)
})?;
state.hnsw_index = None;
state.incremental_count = 0;
}
if !state.pending_files.is_empty() {
process_file_changes(&watch_cfg, &store, &mut state);
}
if state.pending_notes {
state.pending_notes = false;
process_note_changes(&root, &store, cli.quiet);
}
store.clear_caches();
db_id = db_file_identity(&index_path);
if last_cache_evict.elapsed() >= Duration::from_secs(3600) {
super::batch::evict_global_embedding_cache_with_runtime(
"watch reindex cycle",
Some(Arc::clone(&shared_rt)),
);
last_cache_evict = std::time::Instant::now();
}
drop(lock);
} else {
cycles_since_clear += 1;
if cycles_since_clear >= 3000 {
if let Some(emb) = shared_embedder.get() {
emb.clear_session();
}
state.hnsw_index = None;
state.incremental_count = 0;
cycles_since_clear = 0;
}
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
bail!(
"File watcher disconnected unexpectedly. \
Hint: Restart 'cqs watch' to resume monitoring."
);
}
}
if check_interrupted() {
println!("\nStopping watch...");
break;
}
#[cfg(unix)]
if is_shutdown_requested() {
tracing::info!("SIGTERM received, draining watch loop");
if !cli.quiet {
println!("\nSIGTERM received, stopping watch...");
}
break;
}
}
#[cfg(unix)]
if let Some(handle) = socket_thread.take() {
let deadline = std::time::Instant::now() + Duration::from_secs(5);
let poll = Duration::from_millis(50);
let mut handle_opt = Some(handle);
while std::time::Instant::now() < deadline {
match handle_opt.as_ref() {
Some(h) if h.is_finished() => {
if let Err(e) = handle_opt.take().unwrap().join() {
tracing::warn!(?e, "Daemon socket thread panicked during shutdown");
} else {
tracing::info!("Daemon socket thread joined cleanly");
}
break;
}
Some(_) => std::thread::sleep(poll),
None => break,
}
}
if handle_opt.is_some() {
tracing::warn!(
"Daemon socket thread did not finish within 5s shutdown window — detaching (BatchContext Drop may race with process exit)"
);
}
}
Ok(())
}
fn collect_events(event: ¬ify::Event, cfg: &WatchConfig, state: &mut WatchState) {
for path in &event.paths {
let path = if path.exists() {
dunce::canonicalize(path).unwrap_or_else(|_| path.clone())
} else {
path.clone()
};
let norm_path = cqs::normalize_path(&path);
let norm_cqs = cqs::normalize_path(cfg.cqs_dir);
if norm_path.starts_with(&norm_cqs) {
tracing::debug!(path = %norm_path, "Skipping .cqs directory event");
continue;
}
if let Ok(matcher_guard) = cfg.gitignore.read() {
if let Some(matcher) = matcher_guard.as_ref() {
if matcher
.matched_path_or_any_parents(&path, false)
.is_ignore()
{
tracing::trace!(
path = %norm_path,
"Skipping gitignore-matched path (#1002)"
);
continue;
}
}
}
let norm_notes = cqs::normalize_path(cfg.notes_path);
if norm_path == norm_notes {
state.pending_notes = true;
state.last_event = std::time::Instant::now();
continue;
}
let ext_raw = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let ext = ext_raw.to_ascii_lowercase();
if !cfg.supported_ext.contains(ext.as_str()) {
tracing::debug!(path = %path.display(), ext = %ext, "Skipping unsupported extension");
continue;
}
if let Ok(rel) = path.strip_prefix(cfg.root) {
if let Ok(mtime) = std::fs::metadata(&path).and_then(|m| m.modified()) {
if state
.last_indexed_mtime
.get(rel)
.is_some_and(|last| mtime <= *last)
{
tracing::trace!(path = %rel.display(), "Skipping unchanged mtime");
continue;
}
}
if state.pending_files.len() < max_pending_files() {
state.pending_files.insert(rel.to_path_buf());
} else {
state.dropped_this_cycle = state.dropped_this_cycle.saturating_add(1);
tracing::debug!(
max = max_pending_files(),
path = %rel.display(),
"Watch pending_files full, dropping file event"
);
}
state.last_event = std::time::Instant::now();
}
}
}
fn process_file_changes(cfg: &WatchConfig, store: &Store, state: &mut WatchState) {
let files: Vec<PathBuf> = state.pending_files.drain().collect();
let _span = info_span!("process_file_changes", file_count = files.len()).entered();
state.pending_files.shrink_to(64);
if state.dropped_this_cycle > 0 {
tracing::warn!(
dropped = state.dropped_this_cycle,
cap = max_pending_files(),
"Watch event queue full this cycle; dropping events. Run `cqs index` to catch up"
);
state.dropped_this_cycle = 0;
}
if !cfg.quiet {
println!("\n{} file(s) changed, reindexing...", files.len());
for f in &files {
println!(" {}", f.display());
}
}
let emb = match try_init_embedder(cfg.embedder, &mut state.embedder_backoff, cfg.model_config) {
Some(e) => e,
None => return,
};
let pre_mtimes: HashMap<PathBuf, SystemTime> = files
.iter()
.filter_map(|f| {
std::fs::metadata(cfg.root.join(f))
.and_then(|m| m.modified())
.ok()
.map(|t| (f.clone(), t))
})
.collect();
if let Err(e) = store.set_hnsw_dirty(cqs::HnswKind::Enriched, true) {
tracing::warn!(error = %e, "Cannot set enriched HNSW dirty flag — skipping reindex to prevent stale index on crash");
return;
}
if let Err(e) = store.set_hnsw_dirty(cqs::HnswKind::Base, true) {
tracing::warn!(error = %e, "Cannot set base HNSW dirty flag — skipping reindex to prevent stale index on crash");
return;
}
match reindex_files(cfg.root, store, &files, cfg.parser, emb, cfg.quiet) {
Ok((count, content_hashes)) => {
for (file, mtime) in pre_mtimes {
state.last_indexed_mtime.insert(file, mtime);
}
if state.last_indexed_mtime.len() > 5_000 {
state
.last_indexed_mtime
.retain(|f, _| cfg.root.join(f).exists());
}
if !cfg.quiet {
println!("Indexed {} chunk(s)", count);
}
if count > 0 {
match cfg.splade_encoder {
Some(encoder_mu) => {
encode_splade_for_changed_files(encoder_mu, store, &files);
}
None if cqs::splade::resolve_splade_model_dir().is_some() => {
tracing::debug!(
new_chunks = count,
"SPLADE model present but encoder disabled this daemon — \
sparse coverage will drift until manual 'cqs index' \
(CQS_WATCH_INCREMENTAL_SPLADE=0 or load failed)"
);
}
None => {
}
}
}
let needs_full_rebuild =
state.hnsw_index.is_none() || state.incremental_count >= hnsw_rebuild_threshold();
if needs_full_rebuild {
match super::commands::build_hnsw_index_owned(store, cfg.cqs_dir) {
Ok(Some(index)) => {
let n = index.len();
state.hnsw_index = Some(index);
state.incremental_count = 0;
if let Err(e) = store.set_hnsw_dirty(cqs::HnswKind::Enriched, false) {
tracing::warn!(error = %e, "Failed to clear enriched HNSW dirty flag — unnecessary rebuild on next load");
}
info!(vectors = n, "HNSW index rebuilt (full)");
if !cfg.quiet {
println!(" HNSW index: {} vectors (full rebuild)", n);
}
}
Ok(None) => {
state.hnsw_index = None;
}
Err(e) => {
warn!(error = %e, "HNSW rebuild failed, removing stale HNSW files (search falls back to brute-force)");
state.hnsw_index = None;
for ext in cqs::hnsw::HNSW_ALL_EXTENSIONS {
let path = cfg.cqs_dir.join(format!("index.{}", ext));
if let Err(e) = std::fs::remove_file(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
tracing::warn!(
error = %e,
path = %path.display(),
"Failed to delete stale HNSW file"
);
}
}
let base_path = cfg.cqs_dir.join(format!("index_base.{}", ext));
if let Err(e) = std::fs::remove_file(&base_path) {
if e.kind() != std::io::ErrorKind::NotFound {
tracing::warn!(
error = %e,
path = %base_path.display(),
"Failed to delete stale base HNSW file"
);
}
}
}
}
}
match super::commands::build_hnsw_base_index(store, cfg.cqs_dir) {
Ok(Some(n)) => {
info!(vectors = n, "Base HNSW index rebuilt");
if let Err(e) = store.set_hnsw_dirty(cqs::HnswKind::Base, false) {
tracing::warn!(error = %e, "Failed to clear base HNSW dirty flag — unnecessary rebuild on next load");
}
if !cfg.quiet {
println!(" HNSW base index: {} vectors (full rebuild)", n);
}
}
Ok(None) => {
}
Err(e) => {
warn!(error = %e, "Base HNSW rebuild failed, router falls back to enriched-only");
}
}
} else if !content_hashes.is_empty() {
let hash_refs: Vec<&str> = content_hashes.iter().map(|s| s.as_str()).collect();
match store.get_chunk_ids_and_embeddings_by_hashes(&hash_refs) {
Ok(pairs) if !pairs.is_empty() => {
let items: Vec<(String, &[f32])> = pairs
.iter()
.map(|(id, emb)| (id.clone(), emb.as_slice()))
.collect();
if let Some(ref mut index) = state.hnsw_index {
match index.insert_batch(&items) {
Ok(n) => {
state.incremental_count += n;
if let Err(e) = index.save(cfg.cqs_dir, "index") {
warn!(error = %e, "Failed to save HNSW after incremental insert");
} else if let Err(e) =
store.set_hnsw_dirty(cqs::HnswKind::Enriched, false)
{
tracing::warn!(error = %e, "Failed to clear enriched HNSW dirty flag — unnecessary rebuild on next load");
}
info!(
inserted = n,
total = index.len(),
incremental_count = state.incremental_count,
"HNSW incremental insert"
);
if !cfg.quiet {
println!(
" HNSW index: +{} vectors (incremental, {} total)",
n,
index.len()
);
}
}
Err(e) => {
warn!(error = %e, "HNSW incremental insert failed, will rebuild next cycle");
state.hnsw_index = None;
}
}
}
}
Ok(_) => {} Err(e) => {
warn!(error = %e, "Failed to fetch embeddings for HNSW incremental insert");
}
}
}
}
Err(e) => {
warn!(error = %e, "Reindex error");
}
}
}
fn process_note_changes(root: &Path, store: &Store, quiet: bool) {
if !quiet {
println!("\nNotes changed, reindexing...");
}
match reindex_notes(root, store, quiet) {
Ok(count) => {
if !quiet {
println!("Indexed {} note(s)", count);
}
}
Err(e) => {
warn!(error = %e, "Notes reindex error");
}
}
}
fn reindex_files(
root: &Path,
store: &Store,
files: &[PathBuf],
parser: &CqParser,
embedder: &Embedder,
quiet: bool,
) -> Result<(usize, Vec<String>)> {
let _span = info_span!("reindex_files", file_count = files.len()).entered();
info!(file_count = files.len(), "Reindexing files");
let mut all_type_refs: Vec<(PathBuf, Vec<ChunkTypeRefs>)> = Vec::new();
let chunks: Vec<_> = files
.iter()
.flat_map(|rel_path| {
let abs_path = root.join(rel_path);
if !abs_path.exists() {
if let Err(e) = store.delete_by_origin(rel_path) {
tracing::warn!(
path = %rel_path.display(),
error = %e,
"Failed to delete chunks for deleted file"
);
}
return vec![];
}
match parser.parse_file_all(&abs_path) {
Ok((mut file_chunks, calls, chunk_type_refs)) => {
for chunk in &mut file_chunks {
chunk.file = rel_path.clone();
if let Some(rest) = chunk.id.strip_prefix(&abs_path.display().to_string()) {
chunk.id = format!("{}{}", rel_path.display(), rest);
}
}
if !chunk_type_refs.is_empty() {
all_type_refs.push((rel_path.clone(), chunk_type_refs));
}
if !calls.is_empty() {
if let Err(e) = store.upsert_function_calls(rel_path, &calls) {
tracing::warn!(
path = %rel_path.display(),
error = %e,
"Failed to write function_calls for watched file"
);
}
}
file_chunks
}
Err(e) => {
tracing::warn!(path = %abs_path.display(), error = %e, "Failed to parse file");
vec![]
}
}
})
.collect();
let chunks = crate::cli::pipeline::apply_windowing(chunks, embedder);
if chunks.is_empty() {
return Ok((0, Vec::new()));
}
let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
let existing = store.get_embeddings_by_hashes(&hashes)?;
let mut cached: Vec<(usize, Embedding)> = Vec::new();
let mut to_embed: Vec<(usize, &cqs::Chunk)> = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
if let Some(emb) = existing.get(&chunk.content_hash) {
cached.push((i, emb.clone()));
} else {
to_embed.push((i, chunk));
}
}
tracing::info!(
cached = cached.len(),
to_embed = to_embed.len(),
"Embedding cache stats"
);
let content_hashes: Vec<String> = to_embed
.iter()
.map(|(_, c)| c.content_hash.clone())
.collect();
let new_embeddings: Vec<Embedding> = if to_embed.is_empty() {
vec![]
} else {
let texts: Vec<String> = to_embed
.iter()
.map(|(_, c)| generate_nl_description(c))
.collect();
let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
embedder.embed_documents(&text_refs)?.into_iter().collect()
};
let chunk_count = chunks.len();
let mut embeddings: Vec<Embedding> = vec![Embedding::new(vec![]); chunk_count];
for (i, emb) in cached {
embeddings[i] = emb;
}
for ((i, _), emb) in to_embed.into_iter().zip(new_embeddings) {
embeddings[i] = emb;
}
let mut calls_by_id: HashMap<String, Vec<cqs::parser::CallSite>> = HashMap::new();
for chunk in &chunks {
let calls = parser.extract_calls_from_chunk(chunk);
if !calls.is_empty() {
calls_by_id
.entry(chunk.id.clone())
.or_default()
.extend(calls);
}
}
let mut mtime_cache: HashMap<PathBuf, Option<i64>> = HashMap::new();
let mut by_file: HashMap<PathBuf, Vec<(cqs::Chunk, Embedding)>> = HashMap::new();
for (chunk, embedding) in chunks.into_iter().zip(embeddings.into_iter()) {
let file_key = chunk.file.clone();
by_file
.entry(file_key)
.or_default()
.push((chunk, embedding));
}
for (file, pairs) in &by_file {
let mtime = *mtime_cache.entry(file.clone()).or_insert_with(|| {
let abs_path = root.join(file);
abs_path
.metadata()
.and_then(|m| m.modified())
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_millis() as i64)
});
let file_calls: Vec<_> = pairs
.iter()
.flat_map(|(c, _)| {
calls_by_id
.get(&c.id)
.into_iter()
.flat_map(|calls| calls.iter().map(|call| (c.id.clone(), call.clone())))
})
.collect();
store.upsert_chunks_and_calls(pairs, mtime, &file_calls)?;
let live_ids: Vec<&str> = pairs.iter().map(|(c, _)| c.id.as_str()).collect();
store.delete_phantom_chunks(file, &live_ids)?;
}
if let Err(e) = store.upsert_type_edges_for_files(&all_type_refs) {
tracing::warn!(error = %e, "Failed to update type edges");
}
if let Err(e) = store.touch_updated_at() {
tracing::warn!(error = %e, "Failed to update timestamp");
}
if !quiet {
println!("Updated {} file(s)", files.len());
}
Ok((chunk_count, content_hashes))
}
fn reindex_notes(root: &Path, store: &Store, quiet: bool) -> Result<usize> {
let _span = info_span!("reindex_notes").entered();
let notes_path = root.join("docs/notes.toml");
if !notes_path.exists() {
return Ok(0);
}
let lock_file = std::fs::File::open(¬es_path)?;
lock_file.lock_shared()?;
let notes = parse_notes(¬es_path)?;
if notes.is_empty() {
drop(lock_file);
return Ok(0);
}
let count = cqs::index_notes(¬es, ¬es_path, store)?;
drop(lock_file);
if !quiet {
let ns = store.note_stats()?;
println!(
" Notes: {} total ({} warnings, {} patterns)",
ns.total, ns.warnings, ns.patterns
);
}
Ok(count)
}
#[cfg(test)]
mod tests {
use super::*;
use notify::EventKind;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
fn make_event(paths: Vec<PathBuf>, kind: EventKind) -> notify::Event {
notify::Event {
kind,
paths,
attrs: Default::default(),
}
}
fn test_watch_config<'a>(
root: &'a Path,
cqs_dir: &'a Path,
notes_path: &'a Path,
supported_ext: &'a HashSet<&'a str>,
) -> WatchConfig<'a> {
let parser = Box::leak(Box::new(CqParser::new().unwrap()));
let embedder = Box::leak(Box::new(std::sync::OnceLock::new()));
let model_config = Box::leak(Box::new(ModelConfig::default_model()));
let gitignore = Box::leak(Box::new(std::sync::RwLock::new(None)));
WatchConfig {
root,
cqs_dir,
notes_path,
supported_ext,
parser,
embedder,
quiet: true,
model_config,
gitignore,
splade_encoder: None,
}
}
fn test_watch_config_with_gitignore<'a>(
root: &'a Path,
cqs_dir: &'a Path,
notes_path: &'a Path,
supported_ext: &'a HashSet<&'a str>,
matcher: ignore::gitignore::Gitignore,
) -> WatchConfig<'a> {
let parser = Box::leak(Box::new(CqParser::new().unwrap()));
let embedder = Box::leak(Box::new(std::sync::OnceLock::new()));
let model_config = Box::leak(Box::new(ModelConfig::default_model()));
let gitignore = Box::leak(Box::new(std::sync::RwLock::new(Some(matcher))));
WatchConfig {
root,
cqs_dir,
notes_path,
supported_ext,
parser,
embedder,
quiet: true,
model_config,
gitignore,
splade_encoder: None,
}
}
fn test_watch_state() -> WatchState {
WatchState {
embedder_backoff: EmbedderBackoff::new(),
pending_files: HashSet::new(),
pending_notes: false,
last_event: std::time::Instant::now(),
last_indexed_mtime: HashMap::new(),
hnsw_index: None,
incremental_count: 0,
dropped_this_cycle: 0,
}
}
#[test]
fn backoff_initial_state_allows_retry() {
let backoff = EmbedderBackoff::new();
assert!(backoff.should_retry(), "Fresh backoff should allow retry");
}
#[test]
fn backoff_after_failure_delays_retry() {
let mut backoff = EmbedderBackoff::new();
backoff.record_failure();
assert!(
!backoff.should_retry(),
"Should not retry immediately after failure"
);
assert_eq!(backoff.failures, 1);
}
#[test]
fn backoff_reset_clears_failures() {
let mut backoff = EmbedderBackoff::new();
backoff.record_failure();
backoff.record_failure();
backoff.reset();
assert_eq!(backoff.failures, 0);
assert!(backoff.should_retry());
}
#[test]
fn backoff_caps_at_300s() {
let mut backoff = EmbedderBackoff::new();
for _ in 0..9 {
backoff.record_failure();
}
assert_eq!(backoff.failures, 9);
}
#[test]
fn backoff_saturating_add_no_overflow() {
let mut backoff = EmbedderBackoff::new();
backoff.failures = u32::MAX;
backoff.record_failure();
assert_eq!(backoff.failures, u32::MAX, "Should saturate, not overflow");
}
#[test]
fn collect_events_filters_unsupported_extensions() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs", "py", "js"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/readme.txt")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
"Unsupported extension should not be added"
);
assert!(!state.pending_notes);
}
#[test]
fn collect_events_skips_cqs_dir() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs", "db"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/.cqs/index.db")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
".cqs dir events should be skipped"
);
}
fn gitignore_from_lines(root: &Path, lines: &[&str]) -> ignore::gitignore::Gitignore {
let mut b = ignore::gitignore::GitignoreBuilder::new(root);
for line in lines {
b.add_line(None, line).expect("add_line");
}
b.build().expect("build gitignore")
}
#[test]
fn collect_events_skips_gitignore_matched_paths() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let matcher = gitignore_from_lines(&root, &[".claude/", "target/"]);
let cfg =
test_watch_config_with_gitignore(&root, &cqs_dir, ¬es_path, &supported, matcher);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from(
"/tmp/test_project/.claude/worktrees/agent-a1b2c3d4/src/lib.rs",
)],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
".gitignore-matched path .claude/worktrees/... should be skipped"
);
}
#[test]
fn collect_events_skips_target_dir_via_gitignore() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let matcher = gitignore_from_lines(&root, &["target/"]);
let cfg =
test_watch_config_with_gitignore(&root, &cqs_dir, ¬es_path, &supported, matcher);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/target/debug/foo.rs")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
"target/ ignored by .gitignore should be skipped"
);
}
#[test]
fn collect_events_does_not_skip_unrelated_paths_when_gitignore_present() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let matcher = gitignore_from_lines(&root, &[".claude/", "target/"]);
let cfg =
test_watch_config_with_gitignore(&root, &cqs_dir, ¬es_path, &supported, matcher);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/src/foo.rs")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
!state.pending_files.is_empty(),
"src/foo.rs is not in .gitignore and must not be skipped"
);
}
#[test]
fn collect_events_negations_include_path() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let matcher = gitignore_from_lines(&root, &["vendor/", "!vendor/keep/"]);
let cfg =
test_watch_config_with_gitignore(&root, &cqs_dir, ¬es_path, &supported, matcher);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/vendor/keep/lib.rs")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
!state.pending_files.is_empty(),
"negation `!vendor/keep/` must keep the file indexed"
);
}
#[test]
fn collect_events_honors_none_matcher() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from(
"/tmp/test_project/.claude/worktrees/agent-x/src/lib.rs",
)],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
!state.pending_files.is_empty(),
"with matcher=None, all supported-ext paths must be accepted"
);
}
#[test]
fn collect_events_cqs_dir_skip_survives_gitignore_allowlist() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs", "db"].iter().cloned().collect();
let matcher = gitignore_from_lines(&root, &["*.tmp", "!.cqs/"]);
let cfg =
test_watch_config_with_gitignore(&root, &cqs_dir, ¬es_path, &supported, matcher);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/.cqs/index.db")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
".cqs/ must always be skipped (belt-and-suspenders vs gitignore allowlist)"
);
}
#[test]
fn build_gitignore_matcher_missing_returns_none() {
let tmp = tempfile::TempDir::new().unwrap();
assert!(
build_gitignore_matcher(tmp.path()).is_none(),
"missing .gitignore should yield None matcher"
);
}
#[test]
fn build_gitignore_matcher_env_kill_switch() {
let tmp = tempfile::TempDir::new().unwrap();
std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
let prev = std::env::var("CQS_WATCH_RESPECT_GITIGNORE").ok();
std::env::set_var("CQS_WATCH_RESPECT_GITIGNORE", "0");
let result = build_gitignore_matcher(tmp.path());
match prev {
Some(v) => std::env::set_var("CQS_WATCH_RESPECT_GITIGNORE", v),
None => std::env::remove_var("CQS_WATCH_RESPECT_GITIGNORE"),
}
assert!(
result.is_none(),
"CQS_WATCH_RESPECT_GITIGNORE=0 must disable the matcher"
);
}
#[test]
fn build_gitignore_matcher_real_file_loads_rules() {
let tmp = tempfile::TempDir::new().unwrap();
std::fs::write(
tmp.path().join(".gitignore"),
"target/\n.claude/\nnode_modules/\n",
)
.unwrap();
let matcher =
build_gitignore_matcher(tmp.path()).expect("matcher should build for real gitignore");
assert!(matcher.num_ignores() >= 3, "expected ≥3 rules loaded");
let hit = matcher
.matched_path_or_any_parents(tmp.path().join("target/debug/foo.rs"), false)
.is_ignore();
assert!(hit, "target/ should match");
}
#[test]
fn splade_batch_size_env_override() {
let prev = std::env::var("CQS_SPLADE_BATCH").ok();
std::env::set_var("CQS_SPLADE_BATCH", "16");
let got = splade_batch_size();
match prev {
Some(v) => std::env::set_var("CQS_SPLADE_BATCH", v),
None => std::env::remove_var("CQS_SPLADE_BATCH"),
}
assert_eq!(got, 16);
}
#[test]
fn splade_batch_size_default_is_32() {
let prev = std::env::var("CQS_SPLADE_BATCH").ok();
std::env::remove_var("CQS_SPLADE_BATCH");
let got = splade_batch_size();
if let Some(v) = prev {
std::env::set_var("CQS_SPLADE_BATCH", v);
}
assert_eq!(got, 32);
}
#[test]
fn splade_batch_size_invalid_falls_back_to_default() {
let prev = std::env::var("CQS_SPLADE_BATCH").ok();
std::env::set_var("CQS_SPLADE_BATCH", "not-a-number");
let got = splade_batch_size();
match prev {
Some(v) => std::env::set_var("CQS_SPLADE_BATCH", v),
None => std::env::remove_var("CQS_SPLADE_BATCH"),
}
assert_eq!(got, 32, "unparseable value falls back to default");
}
#[test]
fn splade_batch_size_zero_falls_back_to_default() {
let prev = std::env::var("CQS_SPLADE_BATCH").ok();
std::env::set_var("CQS_SPLADE_BATCH", "0");
let got = splade_batch_size();
match prev {
Some(v) => std::env::set_var("CQS_SPLADE_BATCH", v),
None => std::env::remove_var("CQS_SPLADE_BATCH"),
}
assert_eq!(got, 32, "0 is not a valid batch size, falls back");
}
#[test]
fn build_splade_encoder_env_kill_switch_returns_none() {
let prev = std::env::var("CQS_WATCH_INCREMENTAL_SPLADE").ok();
std::env::set_var("CQS_WATCH_INCREMENTAL_SPLADE", "0");
let got = build_splade_encoder_for_watch();
match prev {
Some(v) => std::env::set_var("CQS_WATCH_INCREMENTAL_SPLADE", v),
None => std::env::remove_var("CQS_WATCH_INCREMENTAL_SPLADE"),
}
assert!(
got.is_none(),
"CQS_WATCH_INCREMENTAL_SPLADE=0 must disable the encoder"
);
}
#[test]
fn collect_events_detects_notes_path() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let cqs_dir = root.join(".cqs");
let notes_dir = root.join("docs");
std::fs::create_dir_all(¬es_dir).unwrap();
let notes_path = notes_dir.join("notes.toml");
std::fs::write(¬es_path, "# notes").unwrap();
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![notes_path.clone()],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(state.pending_notes, "Notes path should set pending_notes");
assert!(
state.pending_files.is_empty(),
"Notes should not be added to pending_files"
);
}
#[test]
fn collect_events_respects_max_pending_files() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let cqs_dir = root.join(".cqs");
let notes_path = root.join("docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
for i in 0..max_pending_files() {
state
.pending_files
.insert(PathBuf::from(format!("f{}.rs", i)));
}
let new_file = root.join("overflow.rs");
std::fs::write(&new_file, "fn main() {}").unwrap();
let event = make_event(
vec![new_file],
EventKind::Create(notify::event::CreateKind::File),
);
collect_events(&event, &cfg, &mut state);
assert_eq!(
state.pending_files.len(),
max_pending_files(),
"Should not exceed max_pending_files()"
);
}
#[test]
fn collect_events_skips_unchanged_mtime() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let cqs_dir = root.join(".cqs");
let notes_path = root.join("docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let file = root.join("src/lib.rs");
std::fs::create_dir_all(root.join("src")).unwrap();
std::fs::write(&file, "fn main() {}").unwrap();
let mtime = std::fs::metadata(&file).unwrap().modified().unwrap();
state
.last_indexed_mtime
.insert(PathBuf::from("src/lib.rs"), mtime);
let event = make_event(
vec![file],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
"Unchanged mtime should be skipped"
);
}
#[test]
fn hnsw_rebuild_threshold_is_reasonable() {
assert!(hnsw_rebuild_threshold() > 0);
assert!(hnsw_rebuild_threshold() <= 1000);
}
#[test]
fn max_pending_files_is_bounded() {
assert!(max_pending_files() > 0);
assert!(max_pending_files() <= 100_000);
}
}