use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use notify::{EventKind, RecursiveMode, Watcher};
use second_brain_core::embedding::Embedder;
use second_brain_core::kuzu_store::KuzuStore;
use crate::ingest::ingest_file;
pub fn spawn(store: Arc<KuzuStore>, embedder: Option<Arc<Embedder>>, machine_id: String) {
std::thread::spawn(move || {
if let Err(e) = run(store, embedder, machine_id) {
tracing::error!("watcher thread died: {e}");
}
});
}
fn run(store: Arc<KuzuStore>, embedder: Option<Arc<Embedder>>, machine_id: String) -> anyhow::Result<()> {
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
let watch_dir = PathBuf::from(&home).join(".claude").join("projects");
if !watch_dir.exists() {
tracing::info!("watcher: {} does not exist, skipping", watch_dir.display());
return Ok(());
}
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = notify::recommended_watcher(move |res: Result<notify::Event, _>| {
if let Ok(event) = res
&& matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_))
{
for path in event.paths {
if path.extension().is_some_and(|e| e == "jsonl") {
tx.send(path).ok();
}
}
}
})?;
watcher.watch(&watch_dir, RecursiveMode::Recursive)?;
tracing::info!("watcher: monitoring {}", watch_dir.display());
let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
let settle_duration = Duration::from_secs(10);
loop {
while let Ok(path) = rx.try_recv() {
pending.insert(path, Instant::now());
}
let now = Instant::now();
let ready: Vec<PathBuf> = pending
.iter()
.filter(|(_, when)| now.duration_since(**when) >= settle_duration)
.map(|(p, _)| p.clone())
.collect();
for path in ready {
pending.remove(&path);
ingest_file(&store, embedder.as_deref(), &path, &machine_id);
}
std::thread::sleep(Duration::from_secs(2));
}
}