use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::{Duration, SystemTime};
use anyhow::{bail, Context, Result};
use notify::{Config, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher};
use tracing::{info, info_span, warn};
use cqs::embedder::{Embedder, Embedding, ModelConfig};
use cqs::generate_nl_description;
use cqs::hnsw::HnswIndex;
use cqs::note::parse_notes;
use cqs::parser::{ChunkTypeRefs, Parser as CqParser};
use cqs::store::Store;
use super::{check_interrupted, find_project_root, try_acquire_index_lock, Cli};
#[cfg(unix)]
struct SocketCleanupGuard(PathBuf);
#[cfg(unix)]
impl Drop for SocketCleanupGuard {
fn drop(&mut self) {
if self.0.exists() {
if let Err(e) = std::fs::remove_file(&self.0) {
tracing::warn!(path = %self.0.display(), error = %e, "Failed to remove socket file");
} else {
tracing::info!(path = %self.0.display(), "Daemon socket removed");
}
}
}
}
#[cfg(unix)]
fn handle_socket_client(
mut stream: std::os::unix::net::UnixStream,
batch_ctx: &super::batch::BatchContext,
) {
let _span = tracing::info_span!("daemon_query").entered();
let start = std::time::Instant::now();
stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
stream.set_write_timeout(Some(Duration::from_secs(30))).ok();
let mut reader = std::io::BufReader::new(&stream);
let mut line = String::new();
match std::io::BufRead::read_line(&mut reader, &mut line) {
Ok(0) => return,
Ok(n) if n > 1_048_576 => {
let _ = write_daemon_error(&mut stream, "request too large");
return;
}
Err(e) => {
tracing::debug!(error = %e, "Socket read failed");
return;
}
Ok(_) => {}
}
let request: serde_json::Value = match serde_json::from_str(line.trim()) {
Ok(v) => v,
Err(e) => {
let _ = write_daemon_error(&mut stream, &format!("invalid JSON: {e}"));
return;
}
};
let command = request
.get("command")
.and_then(|v| v.as_str())
.unwrap_or("");
let args: Vec<String> = request
.get("args")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
tracing::debug!(command, args = ?args, "Daemon request");
if command.is_empty() {
let _ = write_daemon_error(&mut stream, "missing 'command' field");
return;
}
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let full_line = if args.is_empty() {
command.to_string()
} else {
format!("{} {}", command, shell_words::join(&args))
};
let mut output = Vec::new();
batch_ctx.dispatch_line(&full_line, &mut output);
String::from_utf8(output).map_err(|e| format!("non-UTF-8 output: {e}"))
}));
match result {
Ok(Ok(output)) => {
let resp = serde_json::json!({
"status": "ok",
"output": output.trim_end(),
});
let _ = writeln!(stream, "{}", resp);
}
Ok(Err(e)) => {
let _ = write_daemon_error(&mut stream, &e);
}
Err(_) => {
let _ = write_daemon_error(&mut stream, "internal error (panic in dispatch)");
tracing::error!("Daemon query panicked — daemon continues");
}
}
tracing::info!(
command,
latency_ms = start.elapsed().as_millis() as u64,
"Daemon query complete"
);
}
#[cfg(unix)]
fn write_daemon_error(
stream: &mut std::os::unix::net::UnixStream,
message: &str,
) -> std::io::Result<()> {
use std::io::Write;
let resp = serde_json::json!({ "status": "error", "message": message });
writeln!(stream, "{}", resp)
}
#[cfg(unix)]
fn db_file_identity(path: &Path) -> Option<(u64, u64)> {
use std::os::unix::fs::MetadataExt;
let meta = std::fs::metadata(path).ok()?;
Some((meta.dev(), meta.ino()))
}
#[cfg(not(unix))]
fn db_file_identity(path: &Path) -> Option<SystemTime> {
std::fs::metadata(path).ok()?.modified().ok()
}
fn hnsw_rebuild_threshold() -> usize {
static CACHE: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*CACHE.get_or_init(|| {
std::env::var("CQS_WATCH_REBUILD_THRESHOLD")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(100)
})
}
fn max_pending_files() -> usize {
static CACHE: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*CACHE.get_or_init(|| {
std::env::var("CQS_WATCH_MAX_PENDING")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(10_000)
})
}
struct WatchConfig<'a> {
root: &'a Path,
cqs_dir: &'a Path,
notes_path: &'a Path,
supported_ext: &'a HashSet<&'a str>,
parser: &'a CqParser,
embedder: &'a OnceCell<Embedder>,
quiet: bool,
model_config: &'a ModelConfig,
}
struct WatchState {
embedder_backoff: EmbedderBackoff,
pending_files: HashSet<PathBuf>,
pending_notes: bool,
last_event: std::time::Instant,
last_indexed_mtime: HashMap<PathBuf, SystemTime>,
hnsw_index: Option<HnswIndex>,
incremental_count: usize,
}
struct EmbedderBackoff {
failures: u32,
next_retry: std::time::Instant,
}
impl EmbedderBackoff {
fn new() -> Self {
Self {
failures: 0,
next_retry: std::time::Instant::now(),
}
}
fn record_failure(&mut self) {
self.failures = self.failures.saturating_add(1);
let delay_secs = 2u64.saturating_pow(self.failures).min(300);
self.next_retry = std::time::Instant::now() + Duration::from_secs(delay_secs);
warn!(
failures = self.failures,
next_retry_secs = delay_secs,
"Embedder init failed, backing off"
);
}
fn reset(&mut self) {
self.failures = 0;
self.next_retry = std::time::Instant::now();
}
fn should_retry(&self) -> bool {
std::time::Instant::now() >= self.next_retry
}
}
fn try_init_embedder<'a>(
embedder: &'a OnceCell<Embedder>,
backoff: &mut EmbedderBackoff,
model_config: &ModelConfig,
) -> Option<&'a Embedder> {
match embedder.get() {
Some(e) => Some(e),
None => {
if !backoff.should_retry() {
return None;
}
match Embedder::new(model_config.clone()) {
Ok(e) => {
backoff.reset();
Some(embedder.get_or_init(|| e))
}
Err(e) => {
warn!(error = %e, "Failed to initialize embedder");
backoff.record_failure();
None
}
}
}
}
}
fn is_under_wsl_automount(path: &str) -> bool {
static AUTOMOUNT_ROOT: std::sync::OnceLock<String> = std::sync::OnceLock::new();
let root = AUTOMOUNT_ROOT
.get_or_init(|| parse_wsl_automount_root().unwrap_or_else(|| "/mnt/".to_string()));
path.starts_with(root.as_str())
}
fn parse_wsl_automount_root() -> Option<String> {
let content = std::fs::read_to_string("/etc/wsl.conf").ok()?;
let mut in_automount = false;
for line in content.lines() {
let trimmed = line.trim();
if trimmed.starts_with('[') {
in_automount = trimmed
.trim_start_matches('[')
.trim_end_matches(']')
.trim()
.eq_ignore_ascii_case("automount");
continue;
}
if in_automount {
if let Some((key, value)) = trimmed.split_once('=') {
if key.trim().eq_ignore_ascii_case("root") {
let mut root = value.trim().to_string();
if !root.ends_with('/') {
root.push('/');
}
return Some(root);
}
}
}
}
None
}
pub fn cmd_watch(
cli: &Cli,
debounce_ms: u64,
no_ignore: bool,
poll: bool,
serve: bool,
) -> Result<()> {
let _span = tracing::info_span!("cmd_watch", debounce_ms, poll, serve).entered();
if no_ignore {
tracing::warn!("--no-ignore is not yet implemented for watch mode");
}
let root = find_project_root();
let use_poll = poll
|| (cqs::config::is_wsl()
&& root
.to_str()
.is_some_and(|p| p.starts_with("//wsl") || is_under_wsl_automount(p)));
if cqs::config::is_wsl() && !use_poll {
tracing::warn!("WSL detected: inotify may be unreliable on Windows filesystem mounts. Use --poll or 'cqs index' periodically.");
}
let cqs_dir = cqs::resolve_index_dir(&root);
let index_path = cqs_dir.join("index.db");
if !index_path.exists() {
bail!("No index found. Run 'cqs index' first.");
}
#[cfg(unix)]
let mut socket_listener = if serve {
let sock_path = super::daemon_socket_path(&cqs_dir);
if sock_path.exists() {
match std::os::unix::net::UnixStream::connect(&sock_path) {
Ok(_) => {
anyhow::bail!(
"Another daemon is already listening on {}",
sock_path.display()
);
}
Err(_) => {
std::fs::remove_file(&sock_path).ok();
tracing::debug!(path = %sock_path.display(), "Removed stale socket file");
}
}
}
let listener = std::os::unix::net::UnixListener::bind(&sock_path)
.with_context(|| format!("Failed to bind socket at {}", sock_path.display()))?;
listener.set_nonblocking(true)?;
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&sock_path, std::fs::Permissions::from_mode(0o600)).ok();
}
tracing::info!(
socket = %sock_path.display(),
pid = std::process::id(),
"Daemon listening"
);
if !cli.quiet {
println!("Daemon listening on {}", sock_path.display());
let query_env_vars = [
"CQS_SPLADE_ALPHA",
"CQS_DISABLE_BASE_INDEX",
"CQS_FORCE_BASE_INDEX",
"CQS_CAGRA_THRESHOLD",
"CQS_INTEGRITY_CHECK",
"CQS_EF_SEARCH",
];
let set_vars: Vec<String> = query_env_vars
.iter()
.filter_map(|k| std::env::var(k).ok().map(|v| format!("{k}={v}")))
.collect();
let per_cat_vars: Vec<String> = std::env::vars()
.filter(|(k, _)| k.starts_with("CQS_SPLADE_ALPHA_"))
.map(|(k, v)| format!("{k}={v}"))
.collect();
if !set_vars.is_empty() || !per_cat_vars.is_empty() {
println!("Query env: {}", set_vars.join(", "));
if !per_cat_vars.is_empty() {
println!("Per-category SPLADE overrides: {}", per_cat_vars.join(", "));
}
} else {
println!("Query env: (using defaults from code)");
}
}
Some((listener, sock_path))
} else {
None
};
#[cfg(unix)]
let _socket_guard = socket_listener
.as_ref()
.map(|(_, path)| SocketCleanupGuard(path.clone()));
#[cfg(not(unix))]
if serve {
tracing::warn!("--serve is not supported on Windows (no Unix domain sockets)");
}
#[cfg(unix)]
let _socket_thread = if serve {
if let Some((listener, _)) = socket_listener.take() {
listener.set_nonblocking(false)?;
let thread = std::thread::spawn(move || {
let ctx = match super::batch::create_context() {
Ok(ctx) => {
ctx.warm();
ctx
}
Err(e) => {
tracing::error!(error = %e, "Daemon BatchContext creation failed");
return;
}
};
tracing::info!("Daemon query thread ready");
for stream in listener.incoming() {
match stream {
Ok(s) => handle_socket_client(s, &ctx),
Err(e) => {
tracing::debug!(error = %e, "Socket accept error");
}
}
}
});
Some(thread)
} else {
None
}
} else {
None
};
let parser = CqParser::new()?;
let supported_ext: HashSet<_> = parser.supported_extensions().iter().cloned().collect();
println!(
"Watching {} for changes (Ctrl+C to stop)...",
root.display()
);
println!(
"Code extensions: {}",
supported_ext.iter().cloned().collect::<Vec<_>>().join(", ")
);
println!("Also watching: docs/notes.toml");
if cqs::splade::resolve_splade_model_dir().is_some() {
println!(
"⚠ SPLADE model configured but watch mode does not refresh sparse vectors for \
newly-added chunks. Run 'cqs index' after a stable edit session to restore \
full SPLADE coverage. Sparse correctness for removed chunks is maintained \
automatically via the v20 schema trigger."
);
tracing::warn!(
"Watch mode does not re-run SPLADE encoding — new chunks will have no sparse \
vectors until manual 'cqs index'. Removals are handled via the v20 chunks-delete \
trigger."
);
}
let (tx, rx) = mpsc::channel();
let config = Config::default().with_poll_interval(Duration::from_millis(debounce_ms));
let mut watcher: Box<dyn Watcher> = if use_poll {
println!("Using poll watcher (interval: {}ms)", debounce_ms);
Box::new(PollWatcher::new(tx, config)?)
} else {
Box::new(RecommendedWatcher::new(tx, config)?)
};
watcher.watch(&root, RecursiveMode::Recursive)?;
let debounce = Duration::from_millis(debounce_ms);
let notes_path = root.join("docs/notes.toml");
let cqs_dir = dunce::canonicalize(&cqs_dir).unwrap_or_else(|e| {
tracing::debug!(path = %cqs_dir.display(), error = %e, "canonicalize failed, using original");
cqs_dir
});
let notes_path = dunce::canonicalize(¬es_path).unwrap_or_else(|e| {
tracing::debug!(path = %notes_path.display(), error = %e, "canonicalize failed, using original");
notes_path
});
let embedder: OnceCell<Embedder> = OnceCell::new();
let mut store = Store::open(&index_path)
.with_context(|| format!("Failed to open store at {}", index_path.display()))?;
let mut db_id = db_file_identity(&index_path);
let (hnsw_index, incremental_count) =
match HnswIndex::load_with_dim(cqs_dir.as_ref(), "index", store.dim()) {
Ok(index) => {
info!(vectors = index.len(), "Loaded existing HNSW index");
(Some(index), hnsw_rebuild_threshold() / 2)
}
Err(ref e) if matches!(e, cqs::hnsw::HnswError::NotFound(_)) => {
tracing::debug!("No prior HNSW index, starting fresh");
(None, 0)
}
Err(e) => {
tracing::warn!(error = %e, "Existing HNSW index unusable, rebuilding from scratch");
(None, 0)
}
};
let model_config = cli.try_model_config()?;
let watch_cfg = WatchConfig {
root: &root,
cqs_dir: &cqs_dir,
notes_path: ¬es_path,
supported_ext: &supported_ext,
parser: &parser,
embedder: &embedder,
quiet: cli.quiet,
model_config,
};
let mut state = WatchState {
embedder_backoff: EmbedderBackoff::new(),
pending_files: HashSet::new(),
pending_notes: false,
last_event: std::time::Instant::now(),
last_indexed_mtime: HashMap::with_capacity(1024),
hnsw_index,
incremental_count,
};
let mut cycles_since_clear: u32 = 0;
loop {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(Ok(event)) => {
collect_events(&event, &watch_cfg, &mut state);
}
Ok(Err(e)) => {
warn!(error = %e, "Watch error");
}
Err(mpsc::RecvTimeoutError::Timeout) => {
let should_process = (!state.pending_files.is_empty() || state.pending_notes)
&& state.last_event.elapsed() >= debounce;
if should_process {
cycles_since_clear = 0;
let lock = match try_acquire_index_lock(&cqs_dir) {
Ok(Some(lock)) => lock,
Ok(None) => {
info!("Index lock held by another process, skipping reindex cycle");
continue;
}
Err(e) => {
warn!(error = %e, "Failed to create index lock file");
continue;
}
};
let current_id = db_file_identity(&index_path);
if current_id != db_id {
info!("index.db replaced (likely cqs index --force), reopening store");
drop(store);
store = Store::open(&index_path).with_context(|| {
format!(
"Failed to re-open store at {} after DB replacement",
index_path.display()
)
})?;
state.hnsw_index = None;
state.incremental_count = 0;
}
if !state.pending_files.is_empty() {
process_file_changes(&watch_cfg, &store, &mut state);
}
if state.pending_notes {
state.pending_notes = false;
process_note_changes(&root, &store, cli.quiet);
}
store.clear_caches();
db_id = db_file_identity(&index_path);
drop(lock);
} else {
cycles_since_clear += 1;
if cycles_since_clear >= 3000 {
if let Some(emb) = embedder.get() {
emb.clear_session();
}
state.hnsw_index = None;
state.incremental_count = 0;
cycles_since_clear = 0;
}
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
bail!(
"File watcher disconnected unexpectedly. \
Hint: Restart 'cqs watch' to resume monitoring."
);
}
}
if check_interrupted() {
println!("\nStopping watch...");
break;
}
}
Ok(())
}
fn collect_events(event: ¬ify::Event, cfg: &WatchConfig, state: &mut WatchState) {
for path in &event.paths {
let path = if path.exists() {
dunce::canonicalize(path).unwrap_or_else(|_| path.clone())
} else {
path.clone()
};
let norm_path = cqs::normalize_path(&path);
let norm_cqs = cqs::normalize_path(cfg.cqs_dir);
if norm_path.starts_with(&norm_cqs) {
tracing::debug!(path = %norm_path, "Skipping .cqs directory event");
continue;
}
let norm_notes = cqs::normalize_path(cfg.notes_path);
if norm_path == norm_notes {
state.pending_notes = true;
state.last_event = std::time::Instant::now();
continue;
}
let ext_raw = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let ext = ext_raw.to_ascii_lowercase();
if !cfg.supported_ext.contains(ext.as_str()) {
tracing::debug!(path = %path.display(), ext = %ext, "Skipping unsupported extension");
continue;
}
if let Ok(rel) = path.strip_prefix(cfg.root) {
if let Ok(mtime) = std::fs::metadata(&path).and_then(|m| m.modified()) {
if state
.last_indexed_mtime
.get(rel)
.is_some_and(|last| mtime <= *last)
{
tracing::trace!(path = %rel.display(), "Skipping unchanged mtime");
continue;
}
}
if state.pending_files.len() < max_pending_files() {
state.pending_files.insert(rel.to_path_buf());
} else {
tracing::warn!(
max = max_pending_files(),
path = %rel.display(),
"Watch pending_files full, dropping file event"
);
}
state.last_event = std::time::Instant::now();
}
}
}
fn process_file_changes(cfg: &WatchConfig, store: &Store, state: &mut WatchState) {
let files: Vec<PathBuf> = state.pending_files.drain().collect();
let _span = info_span!("process_file_changes", file_count = files.len()).entered();
state.pending_files.shrink_to(64);
if !cfg.quiet {
println!("\n{} file(s) changed, reindexing...", files.len());
for f in &files {
println!(" {}", f.display());
}
}
let emb = match try_init_embedder(cfg.embedder, &mut state.embedder_backoff, cfg.model_config) {
Some(e) => e,
None => return,
};
let pre_mtimes: HashMap<PathBuf, SystemTime> = files
.iter()
.filter_map(|f| {
std::fs::metadata(cfg.root.join(f))
.and_then(|m| m.modified())
.ok()
.map(|t| (f.clone(), t))
})
.collect();
if let Err(e) = store.set_hnsw_dirty(true) {
tracing::warn!(error = %e, "Cannot set HNSW dirty flag — skipping reindex to prevent stale index on crash");
return;
}
match reindex_files(cfg.root, store, &files, cfg.parser, emb, cfg.quiet) {
Ok((count, content_hashes)) => {
for (file, mtime) in pre_mtimes {
state.last_indexed_mtime.insert(file, mtime);
}
if state.last_indexed_mtime.len() > 5_000 {
state
.last_indexed_mtime
.retain(|f, _| cfg.root.join(f).exists());
}
if !cfg.quiet {
println!("Indexed {} chunk(s)", count);
}
let needs_full_rebuild =
state.hnsw_index.is_none() || state.incremental_count >= hnsw_rebuild_threshold();
if needs_full_rebuild {
match super::commands::build_hnsw_index_owned(store, cfg.cqs_dir) {
Ok(Some(index)) => {
let n = index.len();
state.hnsw_index = Some(index);
state.incremental_count = 0;
if let Err(e) = store.set_hnsw_dirty(false) {
tracing::warn!(error = %e, "Failed to clear HNSW dirty flag — unnecessary rebuild on next load");
}
info!(vectors = n, "HNSW index rebuilt (full)");
if !cfg.quiet {
println!(" HNSW index: {} vectors (full rebuild)", n);
}
}
Ok(None) => {
state.hnsw_index = None;
}
Err(e) => {
warn!(error = %e, "HNSW rebuild failed, removing stale HNSW files (search falls back to brute-force)");
state.hnsw_index = None;
for ext in cqs::hnsw::HNSW_ALL_EXTENSIONS {
let path = cfg.cqs_dir.join(format!("index.{}", ext));
if path.exists() {
let _ = std::fs::remove_file(&path);
}
let base_path = cfg.cqs_dir.join(format!("index_base.{}", ext));
if base_path.exists() {
let _ = std::fs::remove_file(&base_path);
}
}
}
}
match super::commands::build_hnsw_base_index(store, cfg.cqs_dir) {
Ok(Some(n)) => {
info!(vectors = n, "Base HNSW index rebuilt");
if !cfg.quiet {
println!(" HNSW base index: {} vectors (full rebuild)", n);
}
}
Ok(None) => {
}
Err(e) => {
warn!(error = %e, "Base HNSW rebuild failed, router falls back to enriched-only");
}
}
} else if !content_hashes.is_empty() {
let hash_refs: Vec<&str> = content_hashes.iter().map(|s| s.as_str()).collect();
match store.get_chunk_ids_and_embeddings_by_hashes(&hash_refs) {
Ok(pairs) if !pairs.is_empty() => {
let items: Vec<(String, &[f32])> = pairs
.iter()
.map(|(id, emb)| (id.clone(), emb.as_slice()))
.collect();
if let Some(ref mut index) = state.hnsw_index {
match index.insert_batch(&items) {
Ok(n) => {
state.incremental_count += n;
if let Err(e) = index.save(cfg.cqs_dir, "index") {
warn!(error = %e, "Failed to save HNSW after incremental insert");
} else if let Err(e) = store.set_hnsw_dirty(false) {
tracing::warn!(error = %e, "Failed to clear HNSW dirty flag — unnecessary rebuild on next load");
}
info!(
inserted = n,
total = index.len(),
incremental_count = state.incremental_count,
"HNSW incremental insert"
);
if !cfg.quiet {
println!(
" HNSW index: +{} vectors (incremental, {} total)",
n,
index.len()
);
}
}
Err(e) => {
warn!(error = %e, "HNSW incremental insert failed, will rebuild next cycle");
state.hnsw_index = None;
}
}
}
}
Ok(_) => {} Err(e) => {
warn!(error = %e, "Failed to fetch embeddings for HNSW incremental insert");
}
}
}
}
Err(e) => {
warn!(error = %e, "Reindex error");
}
}
}
fn process_note_changes(root: &Path, store: &Store, quiet: bool) {
if !quiet {
println!("\nNotes changed, reindexing...");
}
match reindex_notes(root, store, quiet) {
Ok(count) => {
if !quiet {
println!("Indexed {} note(s)", count);
}
}
Err(e) => {
warn!(error = %e, "Notes reindex error");
}
}
}
fn reindex_files(
root: &Path,
store: &Store,
files: &[PathBuf],
parser: &CqParser,
embedder: &Embedder,
quiet: bool,
) -> Result<(usize, Vec<String>)> {
let _span = info_span!("reindex_files", file_count = files.len()).entered();
info!(file_count = files.len(), "Reindexing files");
let mut all_type_refs: Vec<(PathBuf, Vec<ChunkTypeRefs>)> = Vec::new();
let chunks: Vec<_> = files
.iter()
.flat_map(|rel_path| {
let abs_path = root.join(rel_path);
if !abs_path.exists() {
if let Err(e) = store.delete_by_origin(rel_path) {
tracing::warn!(
path = %rel_path.display(),
error = %e,
"Failed to delete chunks for deleted file"
);
}
return vec![];
}
match parser.parse_file_all(&abs_path) {
Ok((mut file_chunks, calls, chunk_type_refs)) => {
for chunk in &mut file_chunks {
chunk.file = rel_path.clone();
if let Some(rest) = chunk.id.strip_prefix(&abs_path.display().to_string()) {
chunk.id = format!("{}{}", rel_path.display(), rest);
}
}
if !chunk_type_refs.is_empty() {
all_type_refs.push((rel_path.clone(), chunk_type_refs));
}
if !calls.is_empty() {
if let Err(e) = store.upsert_function_calls(rel_path, &calls) {
tracing::warn!(
path = %rel_path.display(),
error = %e,
"Failed to write function_calls for watched file"
);
}
}
file_chunks
}
Err(e) => {
tracing::warn!(path = %abs_path.display(), error = %e, "Failed to parse file");
vec![]
}
}
})
.collect();
let chunks = crate::cli::pipeline::apply_windowing(chunks, embedder);
if chunks.is_empty() {
return Ok((0, Vec::new()));
}
let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
let existing = store.get_embeddings_by_hashes(&hashes)?;
let mut cached: Vec<(usize, Embedding)> = Vec::new();
let mut to_embed: Vec<(usize, &cqs::Chunk)> = Vec::new();
for (i, chunk) in chunks.iter().enumerate() {
if let Some(emb) = existing.get(&chunk.content_hash) {
cached.push((i, emb.clone()));
} else {
to_embed.push((i, chunk));
}
}
tracing::info!(
cached = cached.len(),
to_embed = to_embed.len(),
"Embedding cache stats"
);
let content_hashes: Vec<String> = to_embed
.iter()
.map(|(_, c)| c.content_hash.clone())
.collect();
let new_embeddings: Vec<Embedding> = if to_embed.is_empty() {
vec![]
} else {
let texts: Vec<String> = to_embed
.iter()
.map(|(_, c)| generate_nl_description(c))
.collect();
let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
embedder.embed_documents(&text_refs)?.into_iter().collect()
};
let chunk_count = chunks.len();
let mut embeddings: Vec<Embedding> = vec![Embedding::new(vec![]); chunk_count];
for (i, emb) in cached {
embeddings[i] = emb;
}
for ((i, _), emb) in to_embed.into_iter().zip(new_embeddings) {
embeddings[i] = emb;
}
let mut calls_by_id: HashMap<String, Vec<cqs::parser::CallSite>> = HashMap::new();
for chunk in &chunks {
let calls = parser.extract_calls_from_chunk(chunk);
if !calls.is_empty() {
calls_by_id
.entry(chunk.id.clone())
.or_default()
.extend(calls);
}
}
let mut mtime_cache: HashMap<PathBuf, Option<i64>> = HashMap::new();
let mut by_file: HashMap<PathBuf, Vec<(cqs::Chunk, Embedding)>> = HashMap::new();
for (chunk, embedding) in chunks.into_iter().zip(embeddings.into_iter()) {
let file_key = chunk.file.clone();
by_file
.entry(file_key)
.or_default()
.push((chunk, embedding));
}
for (file, pairs) in &by_file {
let mtime = *mtime_cache.entry(file.clone()).or_insert_with(|| {
let abs_path = root.join(file);
abs_path
.metadata()
.and_then(|m| m.modified())
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_millis() as i64)
});
let file_calls: Vec<_> = pairs
.iter()
.flat_map(|(c, _)| {
calls_by_id
.get(&c.id)
.into_iter()
.flat_map(|calls| calls.iter().map(|call| (c.id.clone(), call.clone())))
})
.collect();
store.upsert_chunks_and_calls(pairs, mtime, &file_calls)?;
let live_ids: Vec<&str> = pairs.iter().map(|(c, _)| c.id.as_str()).collect();
store.delete_phantom_chunks(file, &live_ids)?;
}
if let Err(e) = store.upsert_type_edges_for_files(&all_type_refs) {
tracing::warn!(error = %e, "Failed to update type edges");
}
if let Err(e) = store.touch_updated_at() {
tracing::warn!(error = %e, "Failed to update timestamp");
}
if !quiet {
println!("Updated {} file(s)", files.len());
}
Ok((chunk_count, content_hashes))
}
fn reindex_notes(root: &Path, store: &Store, quiet: bool) -> Result<usize> {
let _span = info_span!("reindex_notes").entered();
let notes_path = root.join("docs/notes.toml");
if !notes_path.exists() {
return Ok(0);
}
let lock_file = std::fs::File::open(¬es_path)?;
lock_file.lock_shared()?;
let notes = parse_notes(¬es_path)?;
if notes.is_empty() {
drop(lock_file);
return Ok(0);
}
let count = cqs::index_notes(¬es, ¬es_path, store)?;
drop(lock_file);
if !quiet {
let ns = store.note_stats()?;
println!(
" Notes: {} total ({} warnings, {} patterns)",
ns.total, ns.warnings, ns.patterns
);
}
Ok(count)
}
#[cfg(test)]
mod tests {
use super::*;
use notify::EventKind;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
fn make_event(paths: Vec<PathBuf>, kind: EventKind) -> notify::Event {
notify::Event {
kind,
paths,
attrs: Default::default(),
}
}
fn test_watch_config<'a>(
root: &'a Path,
cqs_dir: &'a Path,
notes_path: &'a Path,
supported_ext: &'a HashSet<&'a str>,
) -> WatchConfig<'a> {
let parser = Box::leak(Box::new(CqParser::new().unwrap()));
let embedder = Box::leak(Box::new(OnceCell::new()));
let model_config = Box::leak(Box::new(ModelConfig::default_model()));
WatchConfig {
root,
cqs_dir,
notes_path,
supported_ext,
parser,
embedder,
quiet: true,
model_config,
}
}
fn test_watch_state() -> WatchState {
WatchState {
embedder_backoff: EmbedderBackoff::new(),
pending_files: HashSet::new(),
pending_notes: false,
last_event: std::time::Instant::now(),
last_indexed_mtime: HashMap::new(),
hnsw_index: None,
incremental_count: 0,
}
}
#[test]
fn backoff_initial_state_allows_retry() {
let backoff = EmbedderBackoff::new();
assert!(backoff.should_retry(), "Fresh backoff should allow retry");
}
#[test]
fn backoff_after_failure_delays_retry() {
let mut backoff = EmbedderBackoff::new();
backoff.record_failure();
assert!(
!backoff.should_retry(),
"Should not retry immediately after failure"
);
assert_eq!(backoff.failures, 1);
}
#[test]
fn backoff_reset_clears_failures() {
let mut backoff = EmbedderBackoff::new();
backoff.record_failure();
backoff.record_failure();
backoff.reset();
assert_eq!(backoff.failures, 0);
assert!(backoff.should_retry());
}
#[test]
fn backoff_caps_at_300s() {
let mut backoff = EmbedderBackoff::new();
for _ in 0..9 {
backoff.record_failure();
}
assert_eq!(backoff.failures, 9);
}
#[test]
fn backoff_saturating_add_no_overflow() {
let mut backoff = EmbedderBackoff::new();
backoff.failures = u32::MAX;
backoff.record_failure();
assert_eq!(backoff.failures, u32::MAX, "Should saturate, not overflow");
}
#[test]
fn collect_events_filters_unsupported_extensions() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs", "py", "js"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/readme.txt")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
"Unsupported extension should not be added"
);
assert!(!state.pending_notes);
}
#[test]
fn collect_events_skips_cqs_dir() {
let root = PathBuf::from("/tmp/test_project");
let cqs_dir = PathBuf::from("/tmp/test_project/.cqs");
let notes_path = PathBuf::from("/tmp/test_project/docs/notes.toml");
let supported: HashSet<&str> = ["rs", "db"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![PathBuf::from("/tmp/test_project/.cqs/index.db")],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
".cqs dir events should be skipped"
);
}
#[test]
fn collect_events_detects_notes_path() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let cqs_dir = root.join(".cqs");
let notes_dir = root.join("docs");
std::fs::create_dir_all(¬es_dir).unwrap();
let notes_path = notes_dir.join("notes.toml");
std::fs::write(¬es_path, "# notes").unwrap();
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let event = make_event(
vec![notes_path.clone()],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(state.pending_notes, "Notes path should set pending_notes");
assert!(
state.pending_files.is_empty(),
"Notes should not be added to pending_files"
);
}
#[test]
fn collect_events_respects_max_pending_files() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let cqs_dir = root.join(".cqs");
let notes_path = root.join("docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
for i in 0..max_pending_files() {
state
.pending_files
.insert(PathBuf::from(format!("f{}.rs", i)));
}
let new_file = root.join("overflow.rs");
std::fs::write(&new_file, "fn main() {}").unwrap();
let event = make_event(
vec![new_file],
EventKind::Create(notify::event::CreateKind::File),
);
collect_events(&event, &cfg, &mut state);
assert_eq!(
state.pending_files.len(),
max_pending_files(),
"Should not exceed max_pending_files()"
);
}
#[test]
fn collect_events_skips_unchanged_mtime() {
let tmp = tempfile::TempDir::new().unwrap();
let root = tmp.path().to_path_buf();
let cqs_dir = root.join(".cqs");
let notes_path = root.join("docs/notes.toml");
let supported: HashSet<&str> = ["rs"].iter().cloned().collect();
let cfg = test_watch_config(&root, &cqs_dir, ¬es_path, &supported);
let mut state = test_watch_state();
let file = root.join("src/lib.rs");
std::fs::create_dir_all(root.join("src")).unwrap();
std::fs::write(&file, "fn main() {}").unwrap();
let mtime = std::fs::metadata(&file).unwrap().modified().unwrap();
state
.last_indexed_mtime
.insert(PathBuf::from("src/lib.rs"), mtime);
let event = make_event(
vec![file],
EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
);
collect_events(&event, &cfg, &mut state);
assert!(
state.pending_files.is_empty(),
"Unchanged mtime should be skipped"
);
}
#[test]
fn hnsw_rebuild_threshold_is_reasonable() {
assert!(hnsw_rebuild_threshold() > 0);
assert!(hnsw_rebuild_threshold() <= 1000);
}
#[test]
fn max_pending_files_is_bounded() {
assert!(max_pending_files() > 0);
assert!(max_pending_files() <= 100_000);
}
}