second-brain-api 0.3.1

HTTP API server for second-brain: REST endpoints for recall, remember, and ingest
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use notify::{EventKind, RecursiveMode, Watcher};

use second_brain_core::embedding::Embedder;
use second_brain_core::kuzu_store::KuzuStore;

use crate::ingest::ingest_file;

pub fn spawn(store: Arc<KuzuStore>, embedder: Option<Arc<Embedder>>) {
    std::thread::spawn(move || {
        if let Err(e) = run(store, embedder) {
            tracing::error!("watcher thread died: {e}");
        }
    });
}

fn run(store: Arc<KuzuStore>, embedder: Option<Arc<Embedder>>) -> anyhow::Result<()> {
    let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
    let watch_dir = PathBuf::from(&home).join(".claude").join("projects");

    if !watch_dir.exists() {
        tracing::info!("watcher: {} does not exist, skipping", watch_dir.display());
        return Ok(());
    }

    let (tx, rx) = std::sync::mpsc::channel();

    let mut watcher = notify::recommended_watcher(move |res: Result<notify::Event, _>| {
        if let Ok(event) = res
            && matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_))
        {
            for path in event.paths {
                if path.extension().is_some_and(|e| e == "jsonl") {
                    tx.send(path).ok();
                }
            }
        }
    })?;

    watcher.watch(&watch_dir, RecursiveMode::Recursive)?;
    tracing::info!("watcher: monitoring {}", watch_dir.display());

    let mut pending: HashMap<PathBuf, Instant> = HashMap::new();
    let settle_duration = Duration::from_secs(10);

    loop {
        while let Ok(path) = rx.try_recv() {
            pending.insert(path, Instant::now());
        }

        let now = Instant::now();
        let ready: Vec<PathBuf> = pending
            .iter()
            .filter(|(_, when)| now.duration_since(**when) >= settle_duration)
            .map(|(p, _)| p.clone())
            .collect();

        for path in ready {
            pending.remove(&path);
            ingest_file(&store, embedder.as_deref(), &path);
        }

        std::thread::sleep(Duration::from_secs(2));
    }
}