use anyhow::{Context, Result};
use fs2::FileExt;
use serde::{Deserialize, Serialize};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
pub const DEFAULT_QUEUE_FILE_NAME: &str = "distill-pending.queue";
pub const DEFAULT_LRU_CAP: usize = 100;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DistillSignal {
pub recorded_at: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
pub cwd: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
}
pub fn queue_path(runtime_dir: &Path) -> PathBuf {
runtime_dir.join(DEFAULT_QUEUE_FILE_NAME)
}
pub fn append(runtime_dir: &Path, signal: &DistillSignal, lru_cap: usize) -> Result<()> {
ensure_dir(runtime_dir)?;
let path = queue_path(runtime_dir);
let file = OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(&path)
.with_context(|| format!("opening queue {}", path.display()))?;
file.lock_exclusive()
.with_context(|| format!("locking queue {}", path.display()))?;
let result = (|| -> Result<()> {
let line = serde_json::to_string(signal).context("serializing distill signal")?;
let mut writer = OpenOptions::new()
.append(true)
.open(&path)
.with_context(|| format!("re-opening queue for write {}", path.display()))?;
writeln!(writer, "{}", line).with_context(|| format!("appending to {}", path.display()))?;
writer.flush().ok();
enforce_lru_cap(&path, lru_cap)?;
Ok(())
})();
let _ = FileExt::unlock(&file);
result
}
pub fn drain_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
let path = queue_path(runtime_dir);
if !path.exists() {
return Ok(Vec::new());
}
let file = OpenOptions::new()
.read(true)
.write(true)
.open(&path)
.with_context(|| format!("opening queue {}", path.display()))?;
file.lock_exclusive()
.with_context(|| format!("locking queue {}", path.display()))?;
let result = (|| -> Result<Vec<DistillSignal>> {
let signals = parse_signals_from_path(&path)?;
let mut truncator = OpenOptions::new()
.write(true)
.truncate(true)
.open(&path)
.with_context(|| format!("truncating {}", path.display()))?;
truncator.flush().ok();
Ok(signals)
})();
let _ = FileExt::unlock(&file);
result
}
pub fn peek_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
let path = queue_path(runtime_dir);
if !path.exists() {
return Ok(Vec::new());
}
parse_signals_from_path(&path)
}
fn ensure_dir(dir: &Path) -> Result<()> {
if !dir.exists() {
std::fs::create_dir_all(dir)
.with_context(|| format!("creating runtime dir {}", dir.display()))?;
}
Ok(())
}
fn parse_signals_from_path(path: &Path) -> Result<Vec<DistillSignal>> {
let f = File::open(path).with_context(|| format!("opening {}", path.display()))?;
let reader = BufReader::new(f);
let mut signals = Vec::new();
for (idx, line) in reader.lines().enumerate() {
let line_no = idx + 1;
let raw = match line {
Ok(raw) => raw,
Err(err) => {
eprintln!(
"[spool queue] read error at {}:{line_no}: {err}",
path.display()
);
continue;
}
};
let trimmed = raw.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<DistillSignal>(trimmed) {
Ok(s) => signals.push(s),
Err(err) => {
eprintln!(
"[spool queue] malformed entry at {}:{line_no}: {err}",
path.display()
);
}
}
}
Ok(signals)
}
fn enforce_lru_cap(path: &Path, lru_cap: usize) -> Result<()> {
if lru_cap == 0 {
return Ok(());
}
let signals = parse_signals_from_path(path)?;
if signals.len() <= lru_cap {
return Ok(());
}
let kept = &signals[signals.len() - lru_cap..];
rewrite_queue(path, kept)?;
Ok(())
}
fn rewrite_queue(path: &Path, signals: &[DistillSignal]) -> Result<()> {
let tmp = path.with_extension("spool-tmp");
let mut tmp_file = File::create(&tmp).with_context(|| format!("creating {}", tmp.display()))?;
for s in signals {
let line = serde_json::to_string(s).context("serializing distill signal")?;
writeln!(tmp_file, "{}", line)?;
}
tmp_file.flush().ok();
drop(tmp_file);
std::fs::rename(&tmp, path)
.with_context(|| format!("renaming {} -> {}", tmp.display(), path.display()))?;
Ok(())
}
#[allow(dead_code)]
fn _seek_anchor(_f: &mut File) -> std::io::Result<u64> {
let _ = SeekFrom::Start(0);
Ok(0)
}
#[allow(dead_code)]
fn _seek_trait_marker<S: Seek>(_s: &S) {}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn make_signal(ts: u64, tag: &str) -> DistillSignal {
DistillSignal {
recorded_at: ts,
tool_name: Some(tag.to_string()),
cwd: "/tmp/repo".to_string(),
payload: Some(format!("payload-{tag}")),
}
}
#[test]
fn append_creates_file_and_writes_one_line() {
let temp = tempdir().unwrap();
let runtime = temp.path();
append(runtime, &make_signal(1, "Bash"), DEFAULT_LRU_CAP).unwrap();
let signals = peek_all(runtime).unwrap();
assert_eq!(signals.len(), 1);
assert_eq!(signals[0].tool_name.as_deref(), Some("Bash"));
assert_eq!(signals[0].payload.as_deref(), Some("payload-Bash"));
}
#[test]
fn append_supports_repeated_calls_in_order() {
let temp = tempdir().unwrap();
let runtime = temp.path();
for i in 0..5 {
append(
runtime,
&make_signal(i as u64, &format!("tool{i}")),
DEFAULT_LRU_CAP,
)
.unwrap();
}
let signals = peek_all(runtime).unwrap();
assert_eq!(signals.len(), 5);
for (i, s) in signals.iter().enumerate() {
assert_eq!(s.recorded_at, i as u64);
}
}
#[test]
fn drain_returns_signals_and_truncates_file() {
let temp = tempdir().unwrap();
let runtime = temp.path();
for i in 0..3 {
append(runtime, &make_signal(i, "Edit"), DEFAULT_LRU_CAP).unwrap();
}
let drained = drain_all(runtime).unwrap();
assert_eq!(drained.len(), 3);
let after = peek_all(runtime).unwrap();
assert!(after.is_empty());
}
#[test]
fn drain_returns_empty_when_file_missing() {
let temp = tempdir().unwrap();
let drained = drain_all(temp.path()).unwrap();
assert!(drained.is_empty());
}
#[test]
fn lru_cap_truncates_oldest_entries() {
let temp = tempdir().unwrap();
let runtime = temp.path();
let cap = 3;
for i in 0..5 {
append(runtime, &make_signal(i, "Bash"), cap).unwrap();
}
let signals = peek_all(runtime).unwrap();
assert_eq!(signals.len(), cap);
let timestamps: Vec<u64> = signals.iter().map(|s| s.recorded_at).collect();
assert_eq!(timestamps, vec![2, 3, 4]);
}
#[test]
fn lru_cap_zero_disables_truncation() {
let temp = tempdir().unwrap();
let runtime = temp.path();
for i in 0..10 {
append(runtime, &make_signal(i, "Edit"), 0).unwrap();
}
let signals = peek_all(runtime).unwrap();
assert_eq!(signals.len(), 10);
}
#[test]
fn peek_skips_malformed_lines_without_error() {
let temp = tempdir().unwrap();
let runtime = temp.path();
std::fs::create_dir_all(runtime).unwrap();
let path = queue_path(runtime);
let good = serde_json::to_string(&make_signal(1, "Bash")).unwrap();
let good2 = serde_json::to_string(&make_signal(2, "Edit")).unwrap();
std::fs::write(&path, format!("{good}\n{{ broken json\n\n{good2}\n")).unwrap();
let signals = peek_all(runtime).unwrap();
assert_eq!(signals.len(), 2);
assert_eq!(signals[0].recorded_at, 1);
assert_eq!(signals[1].recorded_at, 2);
}
#[test]
fn append_after_corrupt_lru_cap_keeps_only_valid_recent() {
let temp = tempdir().unwrap();
let runtime = temp.path();
std::fs::create_dir_all(runtime).unwrap();
let path = queue_path(runtime);
let mut lines = String::new();
for i in 0..4 {
lines.push_str(&serde_json::to_string(&make_signal(i, "tool")).unwrap());
lines.push('\n');
}
lines.push_str("not json at all\n");
lines.push_str("{ broken\n");
std::fs::write(&path, lines).unwrap();
append(runtime, &make_signal(99, "fresh"), 3).unwrap();
let kept = peek_all(runtime).unwrap();
assert_eq!(kept.len(), 3);
let ts: Vec<u64> = kept.iter().map(|s| s.recorded_at).collect();
assert_eq!(ts, vec![2, 3, 99]);
}
#[test]
fn flock_serializes_concurrent_appenders() {
let temp = tempdir().unwrap();
let runtime = temp.path().to_path_buf();
let writes_per_thread = 20;
let mut handles = Vec::new();
for t in 0..4 {
let runtime = runtime.clone();
handles.push(std::thread::spawn(move || {
for i in 0..writes_per_thread {
let s = DistillSignal {
recorded_at: (t * writes_per_thread + i) as u64,
tool_name: Some(format!("t{t}")),
cwd: "/tmp".into(),
payload: Some(format!("payload-{t}-{i}")),
};
super::append(runtime.as_path(), &s, DEFAULT_LRU_CAP).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
let signals = peek_all(runtime.as_path()).unwrap();
assert_eq!(signals.len(), 80);
}
}