spool-memory 0.2.3

Local-first developer memory system — persistent, structured knowledge for AI coding tools
Documentation
//! Distill-pending queue — append-only JSONL log under
//! `<cwd>/.spool/distill-pending.queue` that Stop / PreCompact hooks
//! drain into candidate / accepted memory records.
//!
//! ## Why a separate file (vs. piggybacking on the ledger)?
//! - The ledger is the lifecycle truth source. Half-cooked signals
//!   (raw tool outputs, partial prompts) MUST NOT pollute it.
//! - The queue is intentionally lossy under pressure (LRU 100 by
//!   default) — old signals dropped silently. The ledger never drops.
//! - The queue file is project-local; deleting `.spool/` on a fresh
//!   clone safely wipes accumulated signals without touching the
//!   shared vault ledger.
//!
//! ## Concurrency model
//! Multiple Claude Code instances may run hooks against the same
//! `.spool/` (e.g. user opens two terminals in the same repo). We use
//! `fs2::FileExt::lock_exclusive` (POSIX flock) to serialize all
//! mutating operations. flock is advisory + per-fd, sufficient for
//! single-host concurrency. We do NOT support cross-host network FS
//! concurrency (out of scope per ADR-0001 single-user assumption).
//!
//! ## File format
//! One JSON object per line. Each entry has the shape produced by
//! [`hook_runtime::post_tool_use::DistillSignalEnvelope`]:
//! ```text
//! {"recorded_at": <unix>, "tool_name": <opt>, "cwd": <abs>, "payload": <opt>}
//! ```
//!
//! ## Operations
//! - [`append`]: tail-add a single envelope; if total lines exceed the
//!   LRU cap after append, rewrite the file keeping only the most
//!   recent N entries.
//! - [`drain_all`]: read & truncate to empty (Stop hook consumes
//!   queue).
//! - [`peek_all`]: read without truncating (debugging / doctor).

use anyhow::{Context, Result};
use fs2::FileExt;
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};

pub const DEFAULT_QUEUE_FILE_NAME: &str = "distill-pending.queue";
pub const DEFAULT_LRU_CAP: usize = 100;

/// One signal envelope written by the post-tool-use hook.
///
/// Mirrors `hook_runtime::post_tool_use::DistillSignalEnvelope` so the
/// two layers serialize/deserialize each other 1:1. We keep a separate
/// type here so the queue API doesn't pull `hook_runtime` into the
/// dependency graph (cycles).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DistillSignal {
    pub recorded_at: u64,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub tool_name: Option<String>,
    pub cwd: String,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub payload: Option<String>,
}

/// Resolve the queue file path under `<runtime_dir>/distill-pending.queue`.
pub fn queue_path(runtime_dir: &Path) -> PathBuf {
    runtime_dir.join(DEFAULT_QUEUE_FILE_NAME)
}

/// Append a single signal envelope to the queue, then enforce the LRU
/// cap. Atomic w.r.t. concurrent appenders thanks to flock.
pub fn append(runtime_dir: &Path, signal: &DistillSignal, lru_cap: usize) -> Result<()> {
    ensure_dir(runtime_dir)?;
    let path = queue_path(runtime_dir);
    let file = OpenOptions::new()
        .create(true)
        .read(true)
        .append(true)
        .open(&path)
        .with_context(|| format!("opening queue {}", path.display()))?;
    file.lock_exclusive()
        .with_context(|| format!("locking queue {}", path.display()))?;

    let result = (|| -> Result<()> {
        let line = serde_json::to_string(signal).context("serializing distill signal")?;
        // Re-open in append mode (the locked handle was opened with
        // append true, so writeln on it appends — but we need a
        // mutable handle. Re-open to side-step lifetime hassles).
        let mut writer = OpenOptions::new()
            .append(true)
            .open(&path)
            .with_context(|| format!("re-opening queue for write {}", path.display()))?;
        writeln!(writer, "{}", line).with_context(|| format!("appending to {}", path.display()))?;
        writer.flush().ok();

        // Enforce LRU cap. Reading + rewriting is a separate pass; we
        // hold the exclusive lock so no concurrent appender will race.
        enforce_lru_cap(&path, lru_cap)?;
        Ok(())
    })();

    let _ = FileExt::unlock(&file);
    result
}

/// Read all lines and atomically truncate the queue to empty. The
/// caller (typically Stop hook) processes the returned signals into
/// the lifecycle ledger.
pub fn drain_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
    let path = queue_path(runtime_dir);
    if !path.exists() {
        return Ok(Vec::new());
    }
    let file = OpenOptions::new()
        .read(true)
        .write(true)
        .open(&path)
        .with_context(|| format!("opening queue {}", path.display()))?;
    file.lock_exclusive()
        .with_context(|| format!("locking queue {}", path.display()))?;

    let result = (|| -> Result<Vec<DistillSignal>> {
        let signals = parse_signals_from_path(&path)?;
        // Truncate. We re-open with truncate(true) instead of
        // set_len(0) on the locked handle to keep cursor / fd state
        // simple.
        let mut truncator = OpenOptions::new()
            .write(true)
            .truncate(true)
            .open(&path)
            .with_context(|| format!("truncating {}", path.display()))?;
        truncator.flush().ok();
        Ok(signals)
    })();

    let _ = FileExt::unlock(&file);
    result
}

/// Read all lines without modifying the queue. Useful for doctor /
/// debugging.
pub fn peek_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
    let path = queue_path(runtime_dir);
    if !path.exists() {
        return Ok(Vec::new());
    }
    parse_signals_from_path(&path)
}

fn ensure_dir(dir: &Path) -> Result<()> {
    if !dir.exists() {
        std::fs::create_dir_all(dir)
            .with_context(|| format!("creating runtime dir {}", dir.display()))?;
    }
    Ok(())
}

fn parse_signals_from_path(path: &Path) -> Result<Vec<DistillSignal>> {
    let f = File::open(path).with_context(|| format!("opening {}", path.display()))?;
    let reader = BufReader::new(f);
    let mut signals = Vec::new();
    for (idx, line) in reader.lines().enumerate() {
        let line_no = idx + 1;
        let raw = match line {
            Ok(raw) => raw,
            Err(err) => {
                eprintln!(
                    "[spool queue] read error at {}:{line_no}: {err}",
                    path.display()
                );
                continue;
            }
        };
        let trimmed = raw.trim();
        if trimmed.is_empty() {
            continue;
        }
        match serde_json::from_str::<DistillSignal>(trimmed) {
            Ok(s) => signals.push(s),
            Err(err) => {
                // Same skip-and-warn policy as the lifecycle ledger
                // (see `LifecycleStore::read_all`): one corrupt line
                // does not down the whole queue.
                eprintln!(
                    "[spool queue] malformed entry at {}:{line_no}: {err}",
                    path.display()
                );
            }
        }
    }
    Ok(signals)
}

fn enforce_lru_cap(path: &Path, lru_cap: usize) -> Result<()> {
    if lru_cap == 0 {
        return Ok(());
    }
    let signals = parse_signals_from_path(path)?;
    if signals.len() <= lru_cap {
        return Ok(());
    }
    let kept = &signals[signals.len() - lru_cap..];
    rewrite_queue(path, kept)?;
    Ok(())
}

fn rewrite_queue(path: &Path, signals: &[DistillSignal]) -> Result<()> {
    // Atomic rewrite: write to <path>.spool-tmp, rename. Caller holds
    // the flock so no appender races during the rename window.
    let tmp = path.with_extension("spool-tmp");
    let mut tmp_file = File::create(&tmp).with_context(|| format!("creating {}", tmp.display()))?;
    for s in signals {
        let line = serde_json::to_string(s).context("serializing distill signal")?;
        writeln!(tmp_file, "{}", line)?;
    }
    tmp_file.flush().ok();
    drop(tmp_file);
    std::fs::rename(&tmp, path)
        .with_context(|| format!("renaming {} -> {}", tmp.display(), path.display()))?;
    Ok(())
}

// `Seek` is brought in for symmetry with potential future seek-based
// optimizations (e.g. tail-from-offset reads). Keep import minimal.
#[allow(dead_code)]
fn _seek_anchor(_f: &mut File) -> std::io::Result<u64> {
    let _ = SeekFrom::Start(0);
    Ok(0)
}
#[allow(dead_code)]
fn _seek_trait_marker<S: Seek>(_s: &S) {}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    fn make_signal(ts: u64, tag: &str) -> DistillSignal {
        DistillSignal {
            recorded_at: ts,
            tool_name: Some(tag.to_string()),
            cwd: "/tmp/repo".to_string(),
            payload: Some(format!("payload-{tag}")),
        }
    }

    #[test]
    fn append_creates_file_and_writes_one_line() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        append(runtime, &make_signal(1, "Bash"), DEFAULT_LRU_CAP).unwrap();

        let signals = peek_all(runtime).unwrap();
        assert_eq!(signals.len(), 1);
        assert_eq!(signals[0].tool_name.as_deref(), Some("Bash"));
        assert_eq!(signals[0].payload.as_deref(), Some("payload-Bash"));
    }

    #[test]
    fn append_supports_repeated_calls_in_order() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        for i in 0..5 {
            append(
                runtime,
                &make_signal(i as u64, &format!("tool{i}")),
                DEFAULT_LRU_CAP,
            )
            .unwrap();
        }
        let signals = peek_all(runtime).unwrap();
        assert_eq!(signals.len(), 5);
        for (i, s) in signals.iter().enumerate() {
            assert_eq!(s.recorded_at, i as u64);
        }
    }

    #[test]
    fn drain_returns_signals_and_truncates_file() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        for i in 0..3 {
            append(runtime, &make_signal(i, "Edit"), DEFAULT_LRU_CAP).unwrap();
        }
        let drained = drain_all(runtime).unwrap();
        assert_eq!(drained.len(), 3);
        // After drain the file exists but is empty.
        let after = peek_all(runtime).unwrap();
        assert!(after.is_empty());
    }

    #[test]
    fn drain_returns_empty_when_file_missing() {
        let temp = tempdir().unwrap();
        let drained = drain_all(temp.path()).unwrap();
        assert!(drained.is_empty());
    }

    #[test]
    fn lru_cap_truncates_oldest_entries() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        let cap = 3;
        for i in 0..5 {
            append(runtime, &make_signal(i, "Bash"), cap).unwrap();
        }
        let signals = peek_all(runtime).unwrap();
        assert_eq!(signals.len(), cap);
        // Oldest two (0, 1) should be evicted; remaining (2, 3, 4)
        // should appear in order.
        let timestamps: Vec<u64> = signals.iter().map(|s| s.recorded_at).collect();
        assert_eq!(timestamps, vec![2, 3, 4]);
    }

    #[test]
    fn lru_cap_zero_disables_truncation() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        for i in 0..10 {
            append(runtime, &make_signal(i, "Edit"), 0).unwrap();
        }
        let signals = peek_all(runtime).unwrap();
        assert_eq!(signals.len(), 10);
    }

    #[test]
    fn peek_skips_malformed_lines_without_error() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        // Pre-seed: one good line + corruption + one good line. Use
        // direct file write because append() would re-format
        // canonically.
        std::fs::create_dir_all(runtime).unwrap();
        let path = queue_path(runtime);
        let good = serde_json::to_string(&make_signal(1, "Bash")).unwrap();
        let good2 = serde_json::to_string(&make_signal(2, "Edit")).unwrap();
        std::fs::write(&path, format!("{good}\n{{ broken json\n\n{good2}\n")).unwrap();

        let signals = peek_all(runtime).unwrap();
        assert_eq!(signals.len(), 2);
        assert_eq!(signals[0].recorded_at, 1);
        assert_eq!(signals[1].recorded_at, 2);
    }

    #[test]
    fn append_after_corrupt_lru_cap_keeps_only_valid_recent() {
        let temp = tempdir().unwrap();
        let runtime = temp.path();
        std::fs::create_dir_all(runtime).unwrap();
        let path = queue_path(runtime);

        // Plant 4 valid + 2 corrupt lines, then append one more with
        // cap=3. After append, only the 3 most-recent valid signals
        // should remain.
        let mut lines = String::new();
        for i in 0..4 {
            lines.push_str(&serde_json::to_string(&make_signal(i, "tool")).unwrap());
            lines.push('\n');
        }
        lines.push_str("not json at all\n");
        lines.push_str("{ broken\n");
        std::fs::write(&path, lines).unwrap();

        append(runtime, &make_signal(99, "fresh"), 3).unwrap();
        let kept = peek_all(runtime).unwrap();
        assert_eq!(kept.len(), 3);
        // After LRU enforcement we keep the *latest* 3 of the parsed
        // valid signals: 2, 3, then 99 (the fresh append).
        let ts: Vec<u64> = kept.iter().map(|s| s.recorded_at).collect();
        assert_eq!(ts, vec![2, 3, 99]);
    }

    #[test]
    fn flock_serializes_concurrent_appenders() {
        // Spawn a handful of threads each appending; with flock no
        // line should be torn (split mid-record). Verifying torn
        // writes is hard, so we settle for "all writes are
        // round-trippable JSON".
        let temp = tempdir().unwrap();
        let runtime = temp.path().to_path_buf();
        let writes_per_thread = 20;
        let mut handles = Vec::new();
        for t in 0..4 {
            let runtime = runtime.clone();
            handles.push(std::thread::spawn(move || {
                for i in 0..writes_per_thread {
                    let s = DistillSignal {
                        recorded_at: (t * writes_per_thread + i) as u64,
                        tool_name: Some(format!("t{t}")),
                        cwd: "/tmp".into(),
                        payload: Some(format!("payload-{t}-{i}")),
                    };
                    super::append(runtime.as_path(), &s, DEFAULT_LRU_CAP).unwrap();
                }
            }));
        }
        for h in handles {
            h.join().unwrap();
        }
        let signals = peek_all(runtime.as_path()).unwrap();
        // 4 threads * 20 writes = 80 entries, all parseable.
        assert_eq!(signals.len(), 80);
    }
}