Skip to main content

spool/
distill_queue.rs

1//! Distill-pending queue — append-only JSONL log under
2//! `<cwd>/.spool/distill-pending.queue` that Stop / PreCompact hooks
3//! drain into candidate / accepted memory records.
4//!
5//! ## Why a separate file (vs. piggybacking on the ledger)?
6//! - The ledger is the lifecycle truth source. Half-cooked signals
7//!   (raw tool outputs, partial prompts) MUST NOT pollute it.
8//! - The queue is intentionally lossy under pressure (LRU 100 by
9//!   default) — old signals dropped silently. The ledger never drops.
10//! - The queue file is project-local; deleting `.spool/` on a fresh
11//!   clone safely wipes accumulated signals without touching the
12//!   shared vault ledger.
13//!
14//! ## Concurrency model
15//! Multiple Claude Code instances may run hooks against the same
16//! `.spool/` (e.g. user opens two terminals in the same repo). We use
17//! `fs2::FileExt::lock_exclusive` (POSIX flock) to serialize all
18//! mutating operations. flock is advisory + per-fd, sufficient for
19//! single-host concurrency. We do NOT support cross-host network FS
20//! concurrency (out of scope per ADR-0001 single-user assumption).
21//!
22//! ## File format
23//! One JSON object per line. Each entry has the shape produced by
24//! [`hook_runtime::post_tool_use::DistillSignalEnvelope`]:
25//! ```text
26//! {"recorded_at": <unix>, "tool_name": <opt>, "cwd": <abs>, "payload": <opt>}
27//! ```
28//!
29//! ## Operations
30//! - [`append`]: tail-add a single envelope; if total lines exceed the
31//!   LRU cap after append, rewrite the file keeping only the most
32//!   recent N entries.
33//! - [`drain_all`]: read & truncate to empty (Stop hook consumes
34//!   queue).
35//! - [`peek_all`]: read without truncating (debugging / doctor).
36
37use anyhow::{Context, Result};
38use fs2::FileExt;
39use serde::{Deserialize, Serialize};
40use std::fs::{File, OpenOptions};
41use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43
44pub const DEFAULT_QUEUE_FILE_NAME: &str = "distill-pending.queue";
45pub const DEFAULT_LRU_CAP: usize = 100;
46
47/// One signal envelope written by the post-tool-use hook.
48///
49/// Mirrors `hook_runtime::post_tool_use::DistillSignalEnvelope` so the
50/// two layers serialize/deserialize each other 1:1. We keep a separate
51/// type here so the queue API doesn't pull `hook_runtime` into the
52/// dependency graph (cycles).
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54pub struct DistillSignal {
55    pub recorded_at: u64,
56    #[serde(default, skip_serializing_if = "Option::is_none")]
57    pub tool_name: Option<String>,
58    pub cwd: String,
59    #[serde(default, skip_serializing_if = "Option::is_none")]
60    pub payload: Option<String>,
61}
62
63/// Resolve the queue file path under `<runtime_dir>/distill-pending.queue`.
64pub fn queue_path(runtime_dir: &Path) -> PathBuf {
65    runtime_dir.join(DEFAULT_QUEUE_FILE_NAME)
66}
67
68/// Append a single signal envelope to the queue, then enforce the LRU
69/// cap. Atomic w.r.t. concurrent appenders thanks to flock.
70pub fn append(runtime_dir: &Path, signal: &DistillSignal, lru_cap: usize) -> Result<()> {
71    ensure_dir(runtime_dir)?;
72    let path = queue_path(runtime_dir);
73    let file = OpenOptions::new()
74        .create(true)
75        .read(true)
76        .append(true)
77        .open(&path)
78        .with_context(|| format!("opening queue {}", path.display()))?;
79    file.lock_exclusive()
80        .with_context(|| format!("locking queue {}", path.display()))?;
81
82    let result = (|| -> Result<()> {
83        let line = serde_json::to_string(signal).context("serializing distill signal")?;
84        // Re-open in append mode (the locked handle was opened with
85        // append true, so writeln on it appends — but we need a
86        // mutable handle. Re-open to side-step lifetime hassles).
87        let mut writer = OpenOptions::new()
88            .append(true)
89            .open(&path)
90            .with_context(|| format!("re-opening queue for write {}", path.display()))?;
91        writeln!(writer, "{}", line).with_context(|| format!("appending to {}", path.display()))?;
92        writer.flush().ok();
93
94        // Enforce LRU cap. Reading + rewriting is a separate pass; we
95        // hold the exclusive lock so no concurrent appender will race.
96        enforce_lru_cap(&path, lru_cap)?;
97        Ok(())
98    })();
99
100    let _ = FileExt::unlock(&file);
101    result
102}
103
104/// Read all lines and atomically truncate the queue to empty. The
105/// caller (typically Stop hook) processes the returned signals into
106/// the lifecycle ledger.
107pub fn drain_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
108    let path = queue_path(runtime_dir);
109    if !path.exists() {
110        return Ok(Vec::new());
111    }
112    let file = OpenOptions::new()
113        .read(true)
114        .write(true)
115        .open(&path)
116        .with_context(|| format!("opening queue {}", path.display()))?;
117    file.lock_exclusive()
118        .with_context(|| format!("locking queue {}", path.display()))?;
119
120    let result = (|| -> Result<Vec<DistillSignal>> {
121        let signals = parse_signals_from_path(&path)?;
122        // Truncate. We re-open with truncate(true) instead of
123        // set_len(0) on the locked handle to keep cursor / fd state
124        // simple.
125        let mut truncator = OpenOptions::new()
126            .write(true)
127            .truncate(true)
128            .open(&path)
129            .with_context(|| format!("truncating {}", path.display()))?;
130        truncator.flush().ok();
131        Ok(signals)
132    })();
133
134    let _ = FileExt::unlock(&file);
135    result
136}
137
138/// Read all lines without modifying the queue. Useful for doctor /
139/// debugging.
140pub fn peek_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
141    let path = queue_path(runtime_dir);
142    if !path.exists() {
143        return Ok(Vec::new());
144    }
145    parse_signals_from_path(&path)
146}
147
148fn ensure_dir(dir: &Path) -> Result<()> {
149    if !dir.exists() {
150        std::fs::create_dir_all(dir)
151            .with_context(|| format!("creating runtime dir {}", dir.display()))?;
152    }
153    Ok(())
154}
155
156fn parse_signals_from_path(path: &Path) -> Result<Vec<DistillSignal>> {
157    let f = File::open(path).with_context(|| format!("opening {}", path.display()))?;
158    let reader = BufReader::new(f);
159    let mut signals = Vec::new();
160    for (idx, line) in reader.lines().enumerate() {
161        let line_no = idx + 1;
162        let raw = match line {
163            Ok(raw) => raw,
164            Err(err) => {
165                eprintln!(
166                    "[spool queue] read error at {}:{line_no}: {err}",
167                    path.display()
168                );
169                continue;
170            }
171        };
172        let trimmed = raw.trim();
173        if trimmed.is_empty() {
174            continue;
175        }
176        match serde_json::from_str::<DistillSignal>(trimmed) {
177            Ok(s) => signals.push(s),
178            Err(err) => {
179                // Same skip-and-warn policy as the lifecycle ledger
180                // (see `LifecycleStore::read_all`): one corrupt line
181                // does not down the whole queue.
182                eprintln!(
183                    "[spool queue] malformed entry at {}:{line_no}: {err}",
184                    path.display()
185                );
186            }
187        }
188    }
189    Ok(signals)
190}
191
192fn enforce_lru_cap(path: &Path, lru_cap: usize) -> Result<()> {
193    if lru_cap == 0 {
194        return Ok(());
195    }
196    let signals = parse_signals_from_path(path)?;
197    if signals.len() <= lru_cap {
198        return Ok(());
199    }
200    let kept = &signals[signals.len() - lru_cap..];
201    rewrite_queue(path, kept)?;
202    Ok(())
203}
204
205fn rewrite_queue(path: &Path, signals: &[DistillSignal]) -> Result<()> {
206    // Atomic rewrite: write to <path>.spool-tmp, rename. Caller holds
207    // the flock so no appender races during the rename window.
208    let tmp = path.with_extension("spool-tmp");
209    let mut tmp_file = File::create(&tmp).with_context(|| format!("creating {}", tmp.display()))?;
210    for s in signals {
211        let line = serde_json::to_string(s).context("serializing distill signal")?;
212        writeln!(tmp_file, "{}", line)?;
213    }
214    tmp_file.flush().ok();
215    drop(tmp_file);
216    std::fs::rename(&tmp, path)
217        .with_context(|| format!("renaming {} -> {}", tmp.display(), path.display()))?;
218    Ok(())
219}
220
221// `Seek` is brought in for symmetry with potential future seek-based
222// optimizations (e.g. tail-from-offset reads). Keep import minimal.
223#[allow(dead_code)]
224fn _seek_anchor(_f: &mut File) -> std::io::Result<u64> {
225    let _ = SeekFrom::Start(0);
226    Ok(0)
227}
228#[allow(dead_code)]
229fn _seek_trait_marker<S: Seek>(_s: &S) {}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use tempfile::tempdir;
235
236    fn make_signal(ts: u64, tag: &str) -> DistillSignal {
237        DistillSignal {
238            recorded_at: ts,
239            tool_name: Some(tag.to_string()),
240            cwd: "/tmp/repo".to_string(),
241            payload: Some(format!("payload-{tag}")),
242        }
243    }
244
245    #[test]
246    fn append_creates_file_and_writes_one_line() {
247        let temp = tempdir().unwrap();
248        let runtime = temp.path();
249        append(runtime, &make_signal(1, "Bash"), DEFAULT_LRU_CAP).unwrap();
250
251        let signals = peek_all(runtime).unwrap();
252        assert_eq!(signals.len(), 1);
253        assert_eq!(signals[0].tool_name.as_deref(), Some("Bash"));
254        assert_eq!(signals[0].payload.as_deref(), Some("payload-Bash"));
255    }
256
257    #[test]
258    fn append_supports_repeated_calls_in_order() {
259        let temp = tempdir().unwrap();
260        let runtime = temp.path();
261        for i in 0..5 {
262            append(
263                runtime,
264                &make_signal(i as u64, &format!("tool{i}")),
265                DEFAULT_LRU_CAP,
266            )
267            .unwrap();
268        }
269        let signals = peek_all(runtime).unwrap();
270        assert_eq!(signals.len(), 5);
271        for (i, s) in signals.iter().enumerate() {
272            assert_eq!(s.recorded_at, i as u64);
273        }
274    }
275
276    #[test]
277    fn drain_returns_signals_and_truncates_file() {
278        let temp = tempdir().unwrap();
279        let runtime = temp.path();
280        for i in 0..3 {
281            append(runtime, &make_signal(i, "Edit"), DEFAULT_LRU_CAP).unwrap();
282        }
283        let drained = drain_all(runtime).unwrap();
284        assert_eq!(drained.len(), 3);
285        // After drain the file exists but is empty.
286        let after = peek_all(runtime).unwrap();
287        assert!(after.is_empty());
288    }
289
290    #[test]
291    fn drain_returns_empty_when_file_missing() {
292        let temp = tempdir().unwrap();
293        let drained = drain_all(temp.path()).unwrap();
294        assert!(drained.is_empty());
295    }
296
297    #[test]
298    fn lru_cap_truncates_oldest_entries() {
299        let temp = tempdir().unwrap();
300        let runtime = temp.path();
301        let cap = 3;
302        for i in 0..5 {
303            append(runtime, &make_signal(i, "Bash"), cap).unwrap();
304        }
305        let signals = peek_all(runtime).unwrap();
306        assert_eq!(signals.len(), cap);
307        // Oldest two (0, 1) should be evicted; remaining (2, 3, 4)
308        // should appear in order.
309        let timestamps: Vec<u64> = signals.iter().map(|s| s.recorded_at).collect();
310        assert_eq!(timestamps, vec![2, 3, 4]);
311    }
312
313    #[test]
314    fn lru_cap_zero_disables_truncation() {
315        let temp = tempdir().unwrap();
316        let runtime = temp.path();
317        for i in 0..10 {
318            append(runtime, &make_signal(i, "Edit"), 0).unwrap();
319        }
320        let signals = peek_all(runtime).unwrap();
321        assert_eq!(signals.len(), 10);
322    }
323
324    #[test]
325    fn peek_skips_malformed_lines_without_error() {
326        let temp = tempdir().unwrap();
327        let runtime = temp.path();
328        // Pre-seed: one good line + corruption + one good line. Use
329        // direct file write because append() would re-format
330        // canonically.
331        std::fs::create_dir_all(runtime).unwrap();
332        let path = queue_path(runtime);
333        let good = serde_json::to_string(&make_signal(1, "Bash")).unwrap();
334        let good2 = serde_json::to_string(&make_signal(2, "Edit")).unwrap();
335        std::fs::write(&path, format!("{good}\n{{ broken json\n\n{good2}\n")).unwrap();
336
337        let signals = peek_all(runtime).unwrap();
338        assert_eq!(signals.len(), 2);
339        assert_eq!(signals[0].recorded_at, 1);
340        assert_eq!(signals[1].recorded_at, 2);
341    }
342
343    #[test]
344    fn append_after_corrupt_lru_cap_keeps_only_valid_recent() {
345        let temp = tempdir().unwrap();
346        let runtime = temp.path();
347        std::fs::create_dir_all(runtime).unwrap();
348        let path = queue_path(runtime);
349
350        // Plant 4 valid + 2 corrupt lines, then append one more with
351        // cap=3. After append, only the 3 most-recent valid signals
352        // should remain.
353        let mut lines = String::new();
354        for i in 0..4 {
355            lines.push_str(&serde_json::to_string(&make_signal(i, "tool")).unwrap());
356            lines.push('\n');
357        }
358        lines.push_str("not json at all\n");
359        lines.push_str("{ broken\n");
360        std::fs::write(&path, lines).unwrap();
361
362        append(runtime, &make_signal(99, "fresh"), 3).unwrap();
363        let kept = peek_all(runtime).unwrap();
364        assert_eq!(kept.len(), 3);
365        // After LRU enforcement we keep the *latest* 3 of the parsed
366        // valid signals: 2, 3, then 99 (the fresh append).
367        let ts: Vec<u64> = kept.iter().map(|s| s.recorded_at).collect();
368        assert_eq!(ts, vec![2, 3, 99]);
369    }
370
371    #[test]
372    fn flock_serializes_concurrent_appenders() {
373        // Spawn a handful of threads each appending; with flock no
374        // line should be torn (split mid-record). Verifying torn
375        // writes is hard, so we settle for "all writes are
376        // round-trippable JSON".
377        let temp = tempdir().unwrap();
378        let runtime = temp.path().to_path_buf();
379        let writes_per_thread = 20;
380        let mut handles = Vec::new();
381        for t in 0..4 {
382            let runtime = runtime.clone();
383            handles.push(std::thread::spawn(move || {
384                for i in 0..writes_per_thread {
385                    let s = DistillSignal {
386                        recorded_at: (t * writes_per_thread + i) as u64,
387                        tool_name: Some(format!("t{t}")),
388                        cwd: "/tmp".into(),
389                        payload: Some(format!("payload-{t}-{i}")),
390                    };
391                    super::append(runtime.as_path(), &s, DEFAULT_LRU_CAP).unwrap();
392                }
393            }));
394        }
395        for h in handles {
396            h.join().unwrap();
397        }
398        let signals = peek_all(runtime.as_path()).unwrap();
399        // 4 threads * 20 writes = 80 entries, all parseable.
400        assert_eq!(signals.len(), 80);
401    }
402}