1use serde::{Deserialize, Serialize};
16use std::fs;
17use std::path::{Path, PathBuf};
18use std::time::SystemTime;
19
20pub const CYCLE_PACKET_CACHE_VERSION: &str = "cycle-packet-cache-v1";
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum CyclePacketKind {
25 Evidence,
26 ContextPack,
27 Impact,
28 ConflictMatrix,
29}
30
31impl CyclePacketKind {
32 pub fn dir_name(&self) -> &'static str {
33 match self {
34 CyclePacketKind::Evidence => "evidence",
35 CyclePacketKind::ContextPack => "context-pack",
36 CyclePacketKind::Impact => "impact",
37 CyclePacketKind::ConflictMatrix => "conflict-matrix",
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct CyclePacketCacheEntry {
44 pub version: String,
45 pub kind: CyclePacketKind,
46 pub key: String,
47 pub packet_id: String,
48 pub source_watermark: String,
49 pub document_watermark: String,
50 pub staged_diff_watermark: String,
51 pub skipped_phases: Vec<String>,
52 pub compute_micros: u128,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct CyclePacketCacheHitReport {
57 pub kind: CyclePacketKind,
58 pub key: String,
59 pub packet_id: String,
60 pub hit_status: String,
61 pub skipped_phases: Vec<String>,
62 pub lookup_micros: u128,
63}
64
65pub fn cycle_packet_cache_dir(root: &Path) -> PathBuf {
66 root.join(".tsift/cycle-packet-cache")
67}
68
69pub fn cycle_packet_cache_path(root: &Path, kind: CyclePacketKind, key: &str) -> PathBuf {
70 cycle_packet_cache_dir(root)
71 .join(kind.dir_name())
72 .join(format!("{key}.json"))
73}
74
75pub fn cycle_packet_read_cache<T: for<'de> Deserialize<'de>>(
76 root: &Path,
77 kind: CyclePacketKind,
78 key: &str,
79) -> Option<T> {
80 let path = cycle_packet_cache_path(root, kind, key);
81 let bytes = fs::read(path).ok()?;
82 serde_json::from_slice(&bytes).ok()
83}
84
85pub fn cycle_packet_write_cache<T: Serialize>(
86 root: &Path,
87 kind: CyclePacketKind,
88 key: &str,
89 value: &T,
90) {
91 let path = cycle_packet_cache_path(root, kind, key);
92 let Some(parent) = path.parent() else {
93 return;
94 };
95 if fs::create_dir_all(parent).is_err() {
96 return;
97 }
98 if let Ok(bytes) = serde_json::to_vec(value) {
99 let _ = fs::write(path, bytes);
100 }
101}
102
103pub fn cycle_packet_watermark_key(
104 source_watermark: &str,
105 document_watermark: &str,
106 staged_diff_watermark: &str,
107 extra: &[&str],
108) -> String {
109 let mut parts = vec![
110 format!("version:{CYCLE_PACKET_CACHE_VERSION}"),
111 format!("source:{source_watermark}"),
112 format!("document:{document_watermark}"),
113 format!("staged_diff:{staged_diff_watermark}"),
114 ];
115 for e in extra {
116 parts.push(e.to_string());
117 }
118 blake3::hash(parts.join("\n").as_bytes())
119 .to_hex()
120 .to_string()
121}
122
123pub fn cycle_packet_evidence_key(packet_id: &str) -> String {
124 cycle_packet_watermark_key("evidence", "evidence", "evidence", &[packet_id])
125}
126
127pub fn build_cache_hit_report(
128 kind: CyclePacketKind,
129 key: &str,
130 packet_id: &str,
131 status: &str,
132 skipped_phases: &[&str],
133 lookup_micros: u128,
134) -> CyclePacketCacheHitReport {
135 CyclePacketCacheHitReport {
136 kind,
137 key: key.to_string(),
138 packet_id: packet_id.to_string(),
139 hit_status: status.to_string(),
140 skipped_phases: skipped_phases.iter().map(|s| s.to_string()).collect(),
141 lookup_micros,
142 }
143}
144
145pub const CYCLE_PACKET_CACHE_DEFAULT_TTL_SECS: u64 = 24 * 60 * 60;
146pub const CYCLE_PACKET_CACHE_DEFAULT_MAX_BYTES: u64 = 50 * 1024 * 1024;
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct CyclePacketCacheEvictionReport {
150 pub scanned_entries: usize,
151 pub evicted_entries: usize,
152 pub evicted_bytes: u64,
153 pub remaining_entries: usize,
154 pub remaining_bytes: u64,
155 pub ttl_secs: u64,
156 pub max_bytes: u64,
157}
158
159pub fn cycle_packet_cache_stats(root: &Path) -> (usize, u64) {
160 let cache_dir = cycle_packet_cache_dir(root);
161 if !cache_dir.exists() {
162 return (0, 0);
163 }
164 let mut count = 0usize;
165 let mut total_bytes = 0u64;
166 if let Ok(entries) = fs::read_dir(&cache_dir) {
167 for entry in entries.flatten() {
168 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
169 && let Ok(files) = fs::read_dir(entry.path())
170 {
171 for file in files.flatten() {
172 if file.path().extension().is_some_and(|ext| ext == "json")
173 && let Ok(meta) = file.metadata()
174 {
175 count += 1;
176 total_bytes += meta.len();
177 }
178 }
179 }
180 }
181 }
182 (count, total_bytes)
183}
184
185pub fn cycle_packet_cache_evict(
186 root: &Path,
187 ttl_secs: u64,
188 max_bytes: u64,
189) -> CyclePacketCacheEvictionReport {
190 let cache_dir = cycle_packet_cache_dir(root);
191 if !cache_dir.exists() {
192 return CyclePacketCacheEvictionReport {
193 scanned_entries: 0,
194 evicted_entries: 0,
195 evicted_bytes: 0,
196 remaining_entries: 0,
197 remaining_bytes: 0,
198 ttl_secs,
199 max_bytes,
200 };
201 }
202 let now = SystemTime::now();
203 let cutoff = now
204 .duration_since(SystemTime::UNIX_EPOCH)
205 .unwrap_or_default()
206 .as_secs()
207 .saturating_sub(ttl_secs);
208 let mut all_files: Vec<(PathBuf, u64, u64)> = Vec::new();
209 if let Ok(entries) = fs::read_dir(&cache_dir) {
210 for entry in entries.flatten() {
211 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
212 && let Ok(files) = fs::read_dir(entry.path())
213 {
214 for file in files.flatten() {
215 let path = file.path();
216 if path.extension().is_some_and(|ext| ext == "json")
217 && let Ok(meta) = file.metadata()
218 {
219 let size = meta.len();
220 let mtime_secs = meta
221 .modified()
222 .ok()
223 .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
224 .map(|d| d.as_secs())
225 .unwrap_or(u64::MAX);
226 all_files.push((path, size, mtime_secs));
227 }
228 }
229 }
230 }
231 }
232 let scanned = all_files.len();
233 all_files.sort_by_key(|(_, _, mtime)| *mtime);
234 let mut evicted = 0usize;
235 let mut evicted_bytes = 0u64;
236 for (path, size, mtime) in &all_files {
237 if *mtime < cutoff {
238 let _ = fs::remove_file(path);
239 evicted += 1;
240 evicted_bytes += size;
241 }
242 }
243 let remaining: u64 = all_files
244 .iter()
245 .filter(|(_, _, mtime)| *mtime >= cutoff)
246 .map(|(_, size, _)| *size)
247 .sum();
248 if remaining > max_bytes {
249 let expired: Vec<_> = all_files
250 .iter()
251 .filter(|(_, _, mtime)| *mtime >= cutoff)
252 .collect();
253 let mut kept_bytes = 0u64;
254 for (path, size, _) in expired {
255 if kept_bytes.saturating_add(*size) > max_bytes {
256 let _ = fs::remove_file(path);
257 evicted += 1;
258 evicted_bytes += size;
259 } else {
260 kept_bytes = kept_bytes.saturating_add(*size);
261 }
262 }
263 }
264 let (remaining_count, remaining_bytes) = cycle_packet_cache_stats(root);
265 CyclePacketCacheEvictionReport {
266 scanned_entries: scanned,
267 evicted_entries: evicted,
268 evicted_bytes,
269 remaining_entries: remaining_count,
270 remaining_bytes,
271 ttl_secs,
272 max_bytes,
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn cycle_packet_watermark_key_is_stable() {
282 let a = cycle_packet_watermark_key("s1", "d1", "sd1", &["extra"]);
283 let b = cycle_packet_watermark_key("s1", "d1", "sd1", &["extra"]);
284 assert_eq!(a, b);
285 }
286
287 #[test]
288 fn cycle_packet_watermark_key_differs_for_different_inputs() {
289 let a = cycle_packet_watermark_key("s1", "d1", "sd1", &["extra"]);
290 let b = cycle_packet_watermark_key("s2", "d1", "sd1", &["extra"]);
291 assert_ne!(a, b);
292 }
293
294 #[test]
295 fn cycle_packet_evidence_key_is_stable() {
296 let a = cycle_packet_evidence_key("gevd:abc123");
297 let b = cycle_packet_evidence_key("gevd:abc123");
298 assert_eq!(a, b);
299 }
300
301 #[test]
302 fn cycle_packet_evidence_key_differs_for_different_ids() {
303 let a = cycle_packet_evidence_key("gevd:abc123");
304 let b = cycle_packet_evidence_key("gevd:def456");
305 assert_ne!(a, b);
306 }
307
308 #[test]
309 fn cache_dir_uses_tsift_subdirectory() {
310 let dir = cycle_packet_cache_dir(Path::new("/project"));
311 assert_eq!(dir, PathBuf::from("/project/.tsift/cycle-packet-cache"));
312 }
313
314 #[test]
315 fn cache_path_includes_kind_and_key() {
316 let path =
317 cycle_packet_cache_path(Path::new("/project"), CyclePacketKind::Evidence, "abc123");
318 assert_eq!(
319 path,
320 PathBuf::from("/project/.tsift/cycle-packet-cache/evidence/abc123.json")
321 );
322 }
323
324 #[test]
325 fn disk_roundtrip_preserves_entry() {
326 let dir = tempfile::tempdir().unwrap();
327 let root = dir.path();
328 let entry = CyclePacketCacheEntry {
329 version: CYCLE_PACKET_CACHE_VERSION.to_string(),
330 kind: CyclePacketKind::Evidence,
331 key: "test-key".to_string(),
332 packet_id: "gevd:abc123".to_string(),
333 source_watermark: "sw1".to_string(),
334 document_watermark: "dw1".to_string(),
335 staged_diff_watermark: "sdw1".to_string(),
336 skipped_phases: vec!["graph_db_evidence".to_string()],
337 compute_micros: 1234,
338 };
339 cycle_packet_write_cache(root, CyclePacketKind::Evidence, "test-key", &entry);
340 let loaded: CyclePacketCacheEntry =
341 cycle_packet_read_cache(root, CyclePacketKind::Evidence, "test-key").unwrap();
342 assert_eq!(loaded.version, entry.version);
343 assert_eq!(loaded.kind, entry.kind);
344 assert_eq!(loaded.key, entry.key);
345 assert_eq!(loaded.packet_id, entry.packet_id);
346 assert_eq!(loaded.source_watermark, entry.source_watermark);
347 assert_eq!(loaded.document_watermark, entry.document_watermark);
348 assert_eq!(loaded.staged_diff_watermark, entry.staged_diff_watermark);
349 assert_eq!(loaded.skipped_phases, entry.skipped_phases);
350 assert_eq!(loaded.compute_micros, entry.compute_micros);
351 }
352
353 #[test]
354 fn disk_read_returns_none_for_missing() {
355 let dir = tempfile::tempdir().unwrap();
356 let root = dir.path();
357 let result: Option<CyclePacketCacheEntry> =
358 cycle_packet_read_cache(root, CyclePacketKind::Evidence, "nonexistent");
359 assert!(result.is_none());
360 }
361
362 #[test]
363 fn kind_dir_names_are_lowercase_with_hyphens() {
364 assert_eq!(CyclePacketKind::Evidence.dir_name(), "evidence");
365 assert_eq!(CyclePacketKind::ContextPack.dir_name(), "context-pack");
366 assert_eq!(CyclePacketKind::Impact.dir_name(), "impact");
367 assert_eq!(
368 CyclePacketKind::ConflictMatrix.dir_name(),
369 "conflict-matrix"
370 );
371 }
372
373 #[test]
374 fn build_cache_hit_report_captures_fields() {
375 let report = build_cache_hit_report(
376 CyclePacketKind::Evidence,
377 "key1",
378 "gevd:abc",
379 "disk_hit",
380 &["phase_a", "phase_b"],
381 500,
382 );
383 assert_eq!(report.kind, CyclePacketKind::Evidence);
384 assert_eq!(report.key, "key1");
385 assert_eq!(report.packet_id, "gevd:abc");
386 assert_eq!(report.hit_status, "disk_hit");
387 assert_eq!(report.skipped_phases, vec!["phase_a", "phase_b"]);
388 assert_eq!(report.lookup_micros, 500);
389 }
390
391 #[test]
392 fn cache_stats_returns_zero_for_missing_dir() {
393 let dir = tempfile::tempdir().unwrap();
394 let (count, bytes) = cycle_packet_cache_stats(dir.path());
395 assert_eq!(count, 0);
396 assert_eq!(bytes, 0);
397 }
398
399 #[test]
400 fn cache_stats_counts_entries() {
401 let dir = tempfile::tempdir().unwrap();
402 let root = dir.path();
403 cycle_packet_write_cache(
404 root,
405 CyclePacketKind::Evidence,
406 "key1",
407 &serde_json::json!({"test": 1}),
408 );
409 cycle_packet_write_cache(
410 root,
411 CyclePacketKind::Evidence,
412 "key2",
413 &serde_json::json!({"test": 2}),
414 );
415 cycle_packet_write_cache(
416 root,
417 CyclePacketKind::ContextPack,
418 "key3",
419 &serde_json::json!({"test": 3}),
420 );
421 let (count, bytes) = cycle_packet_cache_stats(root);
422 assert_eq!(count, 3);
423 assert!(bytes > 0);
424 }
425
426 #[test]
427 fn evict_removes_expired_entries() {
428 let dir = tempfile::tempdir().unwrap();
429 let root = dir.path();
430 let old_entry = serde_json::json!({"old": true});
431 cycle_packet_write_cache(root, CyclePacketKind::Evidence, "old-key", &old_entry);
432 let old_path = cycle_packet_cache_path(root, CyclePacketKind::Evidence, "old-key");
433 let old_time = std::time::SystemTime::now() - std::time::Duration::from_secs(7200);
434 let file_time = filetime::FileTime::from_system_time(old_time);
435 filetime::set_file_mtime(&old_path, file_time).unwrap();
436
437 let new_entry = serde_json::json!({"new": true});
438 cycle_packet_write_cache(root, CyclePacketKind::Evidence, "new-key", &new_entry);
439
440 let report = cycle_packet_cache_evict(root, 3600, 1024 * 1024 * 1024);
441 assert_eq!(report.evicted_entries, 1);
442 assert_eq!(report.remaining_entries, 1);
443 assert!(
444 cycle_packet_read_cache::<serde_json::Value>(
445 root,
446 CyclePacketKind::Evidence,
447 "old-key"
448 )
449 .is_none(),
450 "old entry should be evicted"
451 );
452 assert!(
453 cycle_packet_read_cache::<serde_json::Value>(
454 root,
455 CyclePacketKind::Evidence,
456 "new-key"
457 )
458 .is_some(),
459 "new entry should survive"
460 );
461 }
462
463 #[test]
464 fn evict_enforces_max_bytes() {
465 let dir = tempfile::tempdir().unwrap();
466 let root = dir.path();
467 for i in 0..5 {
468 let data = serde_json::json!({"payload": "x".repeat(200), "idx": i});
469 cycle_packet_write_cache(root, CyclePacketKind::Evidence, &format!("key-{i}"), &data);
470 }
471 let (count, bytes) = cycle_packet_cache_stats(root);
472 assert_eq!(count, 5);
473 assert!(bytes > 500);
474 let max_bytes = 500u64;
475 let report = cycle_packet_cache_evict(root, 0, max_bytes);
476 assert!(
477 report.evicted_entries > 0,
478 "expected evictions for max_bytes={max_bytes}, got {report:?}"
479 );
480 let (_, remaining_bytes) = cycle_packet_cache_stats(root);
481 assert!(
482 remaining_bytes <= max_bytes + 300,
483 "remaining bytes should be near max_bytes, got {remaining_bytes}"
484 );
485 }
486
487 #[test]
488 fn evict_noop_on_empty_cache() {
489 let dir = tempfile::tempdir().unwrap();
490 let report = cycle_packet_cache_evict(dir.path(), 3600, 1024);
491 assert_eq!(report.scanned_entries, 0);
492 assert_eq!(report.evicted_entries, 0);
493 }
494}