1use anyhow::{Context, Result};
38use fs2::FileExt;
39use serde::{Deserialize, Serialize};
40use std::fs::{File, OpenOptions};
41use std::io::{BufRead, BufReader, Seek, SeekFrom, Write};
42use std::path::{Path, PathBuf};
43
44pub const DEFAULT_QUEUE_FILE_NAME: &str = "distill-pending.queue";
45pub const DEFAULT_LRU_CAP: usize = 100;
46
47#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
54pub struct DistillSignal {
55 pub recorded_at: u64,
56 #[serde(default, skip_serializing_if = "Option::is_none")]
57 pub tool_name: Option<String>,
58 pub cwd: String,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
60 pub payload: Option<String>,
61}
62
63pub fn queue_path(runtime_dir: &Path) -> PathBuf {
65 runtime_dir.join(DEFAULT_QUEUE_FILE_NAME)
66}
67
68pub fn append(runtime_dir: &Path, signal: &DistillSignal, lru_cap: usize) -> Result<()> {
71 ensure_dir(runtime_dir)?;
72 let path = queue_path(runtime_dir);
73 let file = OpenOptions::new()
74 .create(true)
75 .read(true)
76 .append(true)
77 .open(&path)
78 .with_context(|| format!("opening queue {}", path.display()))?;
79 file.lock_exclusive()
80 .with_context(|| format!("locking queue {}", path.display()))?;
81
82 let result = (|| -> Result<()> {
83 let line = serde_json::to_string(signal).context("serializing distill signal")?;
84 let mut writer = OpenOptions::new()
88 .append(true)
89 .open(&path)
90 .with_context(|| format!("re-opening queue for write {}", path.display()))?;
91 writeln!(writer, "{}", line).with_context(|| format!("appending to {}", path.display()))?;
92 writer.flush().ok();
93
94 enforce_lru_cap(&path, lru_cap)?;
97 Ok(())
98 })();
99
100 let _ = FileExt::unlock(&file);
101 result
102}
103
104pub fn drain_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
108 let path = queue_path(runtime_dir);
109 if !path.exists() {
110 return Ok(Vec::new());
111 }
112 let file = OpenOptions::new()
113 .read(true)
114 .write(true)
115 .open(&path)
116 .with_context(|| format!("opening queue {}", path.display()))?;
117 file.lock_exclusive()
118 .with_context(|| format!("locking queue {}", path.display()))?;
119
120 let result = (|| -> Result<Vec<DistillSignal>> {
121 let signals = parse_signals_from_path(&path)?;
122 let mut truncator = OpenOptions::new()
126 .write(true)
127 .truncate(true)
128 .open(&path)
129 .with_context(|| format!("truncating {}", path.display()))?;
130 truncator.flush().ok();
131 Ok(signals)
132 })();
133
134 let _ = FileExt::unlock(&file);
135 result
136}
137
138pub fn peek_all(runtime_dir: &Path) -> Result<Vec<DistillSignal>> {
141 let path = queue_path(runtime_dir);
142 if !path.exists() {
143 return Ok(Vec::new());
144 }
145 parse_signals_from_path(&path)
146}
147
148fn ensure_dir(dir: &Path) -> Result<()> {
149 if !dir.exists() {
150 std::fs::create_dir_all(dir)
151 .with_context(|| format!("creating runtime dir {}", dir.display()))?;
152 }
153 Ok(())
154}
155
156fn parse_signals_from_path(path: &Path) -> Result<Vec<DistillSignal>> {
157 let f = File::open(path).with_context(|| format!("opening {}", path.display()))?;
158 let reader = BufReader::new(f);
159 let mut signals = Vec::new();
160 for (idx, line) in reader.lines().enumerate() {
161 let line_no = idx + 1;
162 let raw = match line {
163 Ok(raw) => raw,
164 Err(err) => {
165 eprintln!(
166 "[spool queue] read error at {}:{line_no}: {err}",
167 path.display()
168 );
169 continue;
170 }
171 };
172 let trimmed = raw.trim();
173 if trimmed.is_empty() {
174 continue;
175 }
176 match serde_json::from_str::<DistillSignal>(trimmed) {
177 Ok(s) => signals.push(s),
178 Err(err) => {
179 eprintln!(
183 "[spool queue] malformed entry at {}:{line_no}: {err}",
184 path.display()
185 );
186 }
187 }
188 }
189 Ok(signals)
190}
191
192fn enforce_lru_cap(path: &Path, lru_cap: usize) -> Result<()> {
193 if lru_cap == 0 {
194 return Ok(());
195 }
196 let signals = parse_signals_from_path(path)?;
197 if signals.len() <= lru_cap {
198 return Ok(());
199 }
200 let kept = &signals[signals.len() - lru_cap..];
201 rewrite_queue(path, kept)?;
202 Ok(())
203}
204
205fn rewrite_queue(path: &Path, signals: &[DistillSignal]) -> Result<()> {
206 let tmp = path.with_extension("spool-tmp");
209 let mut tmp_file = File::create(&tmp).with_context(|| format!("creating {}", tmp.display()))?;
210 for s in signals {
211 let line = serde_json::to_string(s).context("serializing distill signal")?;
212 writeln!(tmp_file, "{}", line)?;
213 }
214 tmp_file.flush().ok();
215 drop(tmp_file);
216 std::fs::rename(&tmp, path)
217 .with_context(|| format!("renaming {} -> {}", tmp.display(), path.display()))?;
218 Ok(())
219}
220
221#[allow(dead_code)]
224fn _seek_anchor(_f: &mut File) -> std::io::Result<u64> {
225 let _ = SeekFrom::Start(0);
226 Ok(0)
227}
228#[allow(dead_code)]
229fn _seek_trait_marker<S: Seek>(_s: &S) {}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use tempfile::tempdir;
235
236 fn make_signal(ts: u64, tag: &str) -> DistillSignal {
237 DistillSignal {
238 recorded_at: ts,
239 tool_name: Some(tag.to_string()),
240 cwd: "/tmp/repo".to_string(),
241 payload: Some(format!("payload-{tag}")),
242 }
243 }
244
245 #[test]
246 fn append_creates_file_and_writes_one_line() {
247 let temp = tempdir().unwrap();
248 let runtime = temp.path();
249 append(runtime, &make_signal(1, "Bash"), DEFAULT_LRU_CAP).unwrap();
250
251 let signals = peek_all(runtime).unwrap();
252 assert_eq!(signals.len(), 1);
253 assert_eq!(signals[0].tool_name.as_deref(), Some("Bash"));
254 assert_eq!(signals[0].payload.as_deref(), Some("payload-Bash"));
255 }
256
257 #[test]
258 fn append_supports_repeated_calls_in_order() {
259 let temp = tempdir().unwrap();
260 let runtime = temp.path();
261 for i in 0..5 {
262 append(
263 runtime,
264 &make_signal(i as u64, &format!("tool{i}")),
265 DEFAULT_LRU_CAP,
266 )
267 .unwrap();
268 }
269 let signals = peek_all(runtime).unwrap();
270 assert_eq!(signals.len(), 5);
271 for (i, s) in signals.iter().enumerate() {
272 assert_eq!(s.recorded_at, i as u64);
273 }
274 }
275
276 #[test]
277 fn drain_returns_signals_and_truncates_file() {
278 let temp = tempdir().unwrap();
279 let runtime = temp.path();
280 for i in 0..3 {
281 append(runtime, &make_signal(i, "Edit"), DEFAULT_LRU_CAP).unwrap();
282 }
283 let drained = drain_all(runtime).unwrap();
284 assert_eq!(drained.len(), 3);
285 let after = peek_all(runtime).unwrap();
287 assert!(after.is_empty());
288 }
289
290 #[test]
291 fn drain_returns_empty_when_file_missing() {
292 let temp = tempdir().unwrap();
293 let drained = drain_all(temp.path()).unwrap();
294 assert!(drained.is_empty());
295 }
296
297 #[test]
298 fn lru_cap_truncates_oldest_entries() {
299 let temp = tempdir().unwrap();
300 let runtime = temp.path();
301 let cap = 3;
302 for i in 0..5 {
303 append(runtime, &make_signal(i, "Bash"), cap).unwrap();
304 }
305 let signals = peek_all(runtime).unwrap();
306 assert_eq!(signals.len(), cap);
307 let timestamps: Vec<u64> = signals.iter().map(|s| s.recorded_at).collect();
310 assert_eq!(timestamps, vec![2, 3, 4]);
311 }
312
313 #[test]
314 fn lru_cap_zero_disables_truncation() {
315 let temp = tempdir().unwrap();
316 let runtime = temp.path();
317 for i in 0..10 {
318 append(runtime, &make_signal(i, "Edit"), 0).unwrap();
319 }
320 let signals = peek_all(runtime).unwrap();
321 assert_eq!(signals.len(), 10);
322 }
323
324 #[test]
325 fn peek_skips_malformed_lines_without_error() {
326 let temp = tempdir().unwrap();
327 let runtime = temp.path();
328 std::fs::create_dir_all(runtime).unwrap();
332 let path = queue_path(runtime);
333 let good = serde_json::to_string(&make_signal(1, "Bash")).unwrap();
334 let good2 = serde_json::to_string(&make_signal(2, "Edit")).unwrap();
335 std::fs::write(&path, format!("{good}\n{{ broken json\n\n{good2}\n")).unwrap();
336
337 let signals = peek_all(runtime).unwrap();
338 assert_eq!(signals.len(), 2);
339 assert_eq!(signals[0].recorded_at, 1);
340 assert_eq!(signals[1].recorded_at, 2);
341 }
342
343 #[test]
344 fn append_after_corrupt_lru_cap_keeps_only_valid_recent() {
345 let temp = tempdir().unwrap();
346 let runtime = temp.path();
347 std::fs::create_dir_all(runtime).unwrap();
348 let path = queue_path(runtime);
349
350 let mut lines = String::new();
354 for i in 0..4 {
355 lines.push_str(&serde_json::to_string(&make_signal(i, "tool")).unwrap());
356 lines.push('\n');
357 }
358 lines.push_str("not json at all\n");
359 lines.push_str("{ broken\n");
360 std::fs::write(&path, lines).unwrap();
361
362 append(runtime, &make_signal(99, "fresh"), 3).unwrap();
363 let kept = peek_all(runtime).unwrap();
364 assert_eq!(kept.len(), 3);
365 let ts: Vec<u64> = kept.iter().map(|s| s.recorded_at).collect();
368 assert_eq!(ts, vec![2, 3, 99]);
369 }
370
371 #[test]
372 fn flock_serializes_concurrent_appenders() {
373 let temp = tempdir().unwrap();
378 let runtime = temp.path().to_path_buf();
379 let writes_per_thread = 20;
380 let mut handles = Vec::new();
381 for t in 0..4 {
382 let runtime = runtime.clone();
383 handles.push(std::thread::spawn(move || {
384 for i in 0..writes_per_thread {
385 let s = DistillSignal {
386 recorded_at: (t * writes_per_thread + i) as u64,
387 tool_name: Some(format!("t{t}")),
388 cwd: "/tmp".into(),
389 payload: Some(format!("payload-{t}-{i}")),
390 };
391 super::append(runtime.as_path(), &s, DEFAULT_LRU_CAP).unwrap();
392 }
393 }));
394 }
395 for h in handles {
396 h.join().unwrap();
397 }
398 let signals = peek_all(runtime.as_path()).unwrap();
399 assert_eq!(signals.len(), 80);
401 }
402}