Skip to main content

tsift_cache/
cycle_packet_cache.rs

1//! Cycle-scoped packet reuse cache (#gpackreuse).
2//!
3//! Caches graph/context packets across a single agent-doc cycle so that
4//! repeated tsift CLI invocations sharing the same source/document/staged-diff
5//! watermarks skip redundant computation and report stable packet ids.
6//!
7//! Packet kinds:
8//! - `evidence`: graph-db evidence reports keyed by `packet_id`
9//! - `context_pack`: context-pack reports keyed by watermark triple
10//! - `impact`: impact reports keyed by source watermark + revision
11//! - `conflict_matrix`: conflict-matrix reports keyed by prepared inputs + targets
12//!
13//! Spec: see specs/graph.md ยง "Cycle Packet Cache".
14
15use serde::{Deserialize, Serialize};
16use std::fs;
17use std::path::{Path, PathBuf};
18use std::time::SystemTime;
19
20pub const CYCLE_PACKET_CACHE_VERSION: &str = "cycle-packet-cache-v1";
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "snake_case")]
24pub enum CyclePacketKind {
25    Evidence,
26    ContextPack,
27    Impact,
28    ConflictMatrix,
29}
30
31impl CyclePacketKind {
32    pub fn dir_name(&self) -> &'static str {
33        match self {
34            CyclePacketKind::Evidence => "evidence",
35            CyclePacketKind::ContextPack => "context-pack",
36            CyclePacketKind::Impact => "impact",
37            CyclePacketKind::ConflictMatrix => "conflict-matrix",
38        }
39    }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct CyclePacketCacheEntry {
44    pub version: String,
45    pub kind: CyclePacketKind,
46    pub key: String,
47    pub packet_id: String,
48    pub source_watermark: String,
49    pub document_watermark: String,
50    pub staged_diff_watermark: String,
51    pub skipped_phases: Vec<String>,
52    pub compute_micros: u128,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct CyclePacketCacheHitReport {
57    pub kind: CyclePacketKind,
58    pub key: String,
59    pub packet_id: String,
60    pub hit_status: String,
61    pub skipped_phases: Vec<String>,
62    pub lookup_micros: u128,
63}
64
65pub fn cycle_packet_cache_dir(root: &Path) -> PathBuf {
66    root.join(".tsift/cycle-packet-cache")
67}
68
69pub fn cycle_packet_cache_path(root: &Path, kind: CyclePacketKind, key: &str) -> PathBuf {
70    cycle_packet_cache_dir(root)
71        .join(kind.dir_name())
72        .join(format!("{key}.json"))
73}
74
75pub fn cycle_packet_read_cache<T: for<'de> Deserialize<'de>>(
76    root: &Path,
77    kind: CyclePacketKind,
78    key: &str,
79) -> Option<T> {
80    let path = cycle_packet_cache_path(root, kind, key);
81    let bytes = fs::read(path).ok()?;
82    serde_json::from_slice(&bytes).ok()
83}
84
85pub fn cycle_packet_write_cache<T: Serialize>(
86    root: &Path,
87    kind: CyclePacketKind,
88    key: &str,
89    value: &T,
90) {
91    let path = cycle_packet_cache_path(root, kind, key);
92    let Some(parent) = path.parent() else {
93        return;
94    };
95    if fs::create_dir_all(parent).is_err() {
96        return;
97    }
98    if let Ok(bytes) = serde_json::to_vec(value) {
99        let _ = fs::write(path, bytes);
100    }
101}
102
103pub fn cycle_packet_watermark_key(
104    source_watermark: &str,
105    document_watermark: &str,
106    staged_diff_watermark: &str,
107    extra: &[&str],
108) -> String {
109    let mut parts = vec![
110        format!("version:{CYCLE_PACKET_CACHE_VERSION}"),
111        format!("source:{source_watermark}"),
112        format!("document:{document_watermark}"),
113        format!("staged_diff:{staged_diff_watermark}"),
114    ];
115    for e in extra {
116        parts.push(e.to_string());
117    }
118    blake3::hash(parts.join("\n").as_bytes())
119        .to_hex()
120        .to_string()
121}
122
123pub fn cycle_packet_evidence_key(packet_id: &str) -> String {
124    cycle_packet_watermark_key("evidence", "evidence", "evidence", &[packet_id])
125}
126
127pub fn build_cache_hit_report(
128    kind: CyclePacketKind,
129    key: &str,
130    packet_id: &str,
131    status: &str,
132    skipped_phases: &[&str],
133    lookup_micros: u128,
134) -> CyclePacketCacheHitReport {
135    CyclePacketCacheHitReport {
136        kind,
137        key: key.to_string(),
138        packet_id: packet_id.to_string(),
139        hit_status: status.to_string(),
140        skipped_phases: skipped_phases.iter().map(|s| s.to_string()).collect(),
141        lookup_micros,
142    }
143}
144
145pub const CYCLE_PACKET_CACHE_DEFAULT_TTL_SECS: u64 = 24 * 60 * 60;
146pub const CYCLE_PACKET_CACHE_DEFAULT_MAX_BYTES: u64 = 50 * 1024 * 1024;
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
149pub struct CyclePacketCacheEvictionReport {
150    pub scanned_entries: usize,
151    pub evicted_entries: usize,
152    pub evicted_bytes: u64,
153    pub remaining_entries: usize,
154    pub remaining_bytes: u64,
155    pub ttl_secs: u64,
156    pub max_bytes: u64,
157}
158
159pub fn cycle_packet_cache_stats(root: &Path) -> (usize, u64) {
160    let cache_dir = cycle_packet_cache_dir(root);
161    if !cache_dir.exists() {
162        return (0, 0);
163    }
164    let mut count = 0usize;
165    let mut total_bytes = 0u64;
166    if let Ok(entries) = fs::read_dir(&cache_dir) {
167        for entry in entries.flatten() {
168            if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
169                && let Ok(files) = fs::read_dir(entry.path())
170            {
171                for file in files.flatten() {
172                    if file.path().extension().is_some_and(|ext| ext == "json")
173                        && let Ok(meta) = file.metadata()
174                    {
175                        count += 1;
176                        total_bytes += meta.len();
177                    }
178                }
179            }
180        }
181    }
182    (count, total_bytes)
183}
184
185pub fn cycle_packet_cache_evict(
186    root: &Path,
187    ttl_secs: u64,
188    max_bytes: u64,
189) -> CyclePacketCacheEvictionReport {
190    let cache_dir = cycle_packet_cache_dir(root);
191    if !cache_dir.exists() {
192        return CyclePacketCacheEvictionReport {
193            scanned_entries: 0,
194            evicted_entries: 0,
195            evicted_bytes: 0,
196            remaining_entries: 0,
197            remaining_bytes: 0,
198            ttl_secs,
199            max_bytes,
200        };
201    }
202    let now = SystemTime::now();
203    let cutoff = now
204        .duration_since(SystemTime::UNIX_EPOCH)
205        .unwrap_or_default()
206        .as_secs()
207        .saturating_sub(ttl_secs);
208    let mut all_files: Vec<(PathBuf, u64, u64)> = Vec::new();
209    if let Ok(entries) = fs::read_dir(&cache_dir) {
210        for entry in entries.flatten() {
211            if entry.file_type().map(|t| t.is_dir()).unwrap_or(false)
212                && let Ok(files) = fs::read_dir(entry.path())
213            {
214                for file in files.flatten() {
215                    let path = file.path();
216                    if path.extension().is_some_and(|ext| ext == "json")
217                        && let Ok(meta) = file.metadata()
218                    {
219                        let size = meta.len();
220                        let mtime_secs = meta
221                            .modified()
222                            .ok()
223                            .and_then(|t| t.duration_since(SystemTime::UNIX_EPOCH).ok())
224                            .map(|d| d.as_secs())
225                            .unwrap_or(u64::MAX);
226                        all_files.push((path, size, mtime_secs));
227                    }
228                }
229            }
230        }
231    }
232    let scanned = all_files.len();
233    all_files.sort_by_key(|(_, _, mtime)| *mtime);
234    let mut evicted = 0usize;
235    let mut evicted_bytes = 0u64;
236    for (path, size, mtime) in &all_files {
237        if *mtime < cutoff {
238            let _ = fs::remove_file(path);
239            evicted += 1;
240            evicted_bytes += size;
241        }
242    }
243    let remaining: u64 = all_files
244        .iter()
245        .filter(|(_, _, mtime)| *mtime >= cutoff)
246        .map(|(_, size, _)| *size)
247        .sum();
248    if remaining > max_bytes {
249        let expired: Vec<_> = all_files
250            .iter()
251            .filter(|(_, _, mtime)| *mtime >= cutoff)
252            .collect();
253        let mut kept_bytes = 0u64;
254        for (path, size, _) in expired {
255            if kept_bytes.saturating_add(*size) > max_bytes {
256                let _ = fs::remove_file(path);
257                evicted += 1;
258                evicted_bytes += size;
259            } else {
260                kept_bytes = kept_bytes.saturating_add(*size);
261            }
262        }
263    }
264    let (remaining_count, remaining_bytes) = cycle_packet_cache_stats(root);
265    CyclePacketCacheEvictionReport {
266        scanned_entries: scanned,
267        evicted_entries: evicted,
268        evicted_bytes,
269        remaining_entries: remaining_count,
270        remaining_bytes,
271        ttl_secs,
272        max_bytes,
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn cycle_packet_watermark_key_is_stable() {
282        let a = cycle_packet_watermark_key("s1", "d1", "sd1", &["extra"]);
283        let b = cycle_packet_watermark_key("s1", "d1", "sd1", &["extra"]);
284        assert_eq!(a, b);
285    }
286
287    #[test]
288    fn cycle_packet_watermark_key_differs_for_different_inputs() {
289        let a = cycle_packet_watermark_key("s1", "d1", "sd1", &["extra"]);
290        let b = cycle_packet_watermark_key("s2", "d1", "sd1", &["extra"]);
291        assert_ne!(a, b);
292    }
293
294    #[test]
295    fn cycle_packet_evidence_key_is_stable() {
296        let a = cycle_packet_evidence_key("gevd:abc123");
297        let b = cycle_packet_evidence_key("gevd:abc123");
298        assert_eq!(a, b);
299    }
300
301    #[test]
302    fn cycle_packet_evidence_key_differs_for_different_ids() {
303        let a = cycle_packet_evidence_key("gevd:abc123");
304        let b = cycle_packet_evidence_key("gevd:def456");
305        assert_ne!(a, b);
306    }
307
308    #[test]
309    fn cache_dir_uses_tsift_subdirectory() {
310        let dir = cycle_packet_cache_dir(Path::new("/project"));
311        assert_eq!(dir, PathBuf::from("/project/.tsift/cycle-packet-cache"));
312    }
313
314    #[test]
315    fn cache_path_includes_kind_and_key() {
316        let path =
317            cycle_packet_cache_path(Path::new("/project"), CyclePacketKind::Evidence, "abc123");
318        assert_eq!(
319            path,
320            PathBuf::from("/project/.tsift/cycle-packet-cache/evidence/abc123.json")
321        );
322    }
323
324    #[test]
325    fn disk_roundtrip_preserves_entry() {
326        let dir = tempfile::tempdir().unwrap();
327        let root = dir.path();
328        let entry = CyclePacketCacheEntry {
329            version: CYCLE_PACKET_CACHE_VERSION.to_string(),
330            kind: CyclePacketKind::Evidence,
331            key: "test-key".to_string(),
332            packet_id: "gevd:abc123".to_string(),
333            source_watermark: "sw1".to_string(),
334            document_watermark: "dw1".to_string(),
335            staged_diff_watermark: "sdw1".to_string(),
336            skipped_phases: vec!["graph_db_evidence".to_string()],
337            compute_micros: 1234,
338        };
339        cycle_packet_write_cache(root, CyclePacketKind::Evidence, "test-key", &entry);
340        let loaded: CyclePacketCacheEntry =
341            cycle_packet_read_cache(root, CyclePacketKind::Evidence, "test-key").unwrap();
342        assert_eq!(loaded.version, entry.version);
343        assert_eq!(loaded.kind, entry.kind);
344        assert_eq!(loaded.key, entry.key);
345        assert_eq!(loaded.packet_id, entry.packet_id);
346        assert_eq!(loaded.source_watermark, entry.source_watermark);
347        assert_eq!(loaded.document_watermark, entry.document_watermark);
348        assert_eq!(loaded.staged_diff_watermark, entry.staged_diff_watermark);
349        assert_eq!(loaded.skipped_phases, entry.skipped_phases);
350        assert_eq!(loaded.compute_micros, entry.compute_micros);
351    }
352
353    #[test]
354    fn disk_read_returns_none_for_missing() {
355        let dir = tempfile::tempdir().unwrap();
356        let root = dir.path();
357        let result: Option<CyclePacketCacheEntry> =
358            cycle_packet_read_cache(root, CyclePacketKind::Evidence, "nonexistent");
359        assert!(result.is_none());
360    }
361
362    #[test]
363    fn kind_dir_names_are_lowercase_with_hyphens() {
364        assert_eq!(CyclePacketKind::Evidence.dir_name(), "evidence");
365        assert_eq!(CyclePacketKind::ContextPack.dir_name(), "context-pack");
366        assert_eq!(CyclePacketKind::Impact.dir_name(), "impact");
367        assert_eq!(
368            CyclePacketKind::ConflictMatrix.dir_name(),
369            "conflict-matrix"
370        );
371    }
372
373    #[test]
374    fn build_cache_hit_report_captures_fields() {
375        let report = build_cache_hit_report(
376            CyclePacketKind::Evidence,
377            "key1",
378            "gevd:abc",
379            "disk_hit",
380            &["phase_a", "phase_b"],
381            500,
382        );
383        assert_eq!(report.kind, CyclePacketKind::Evidence);
384        assert_eq!(report.key, "key1");
385        assert_eq!(report.packet_id, "gevd:abc");
386        assert_eq!(report.hit_status, "disk_hit");
387        assert_eq!(report.skipped_phases, vec!["phase_a", "phase_b"]);
388        assert_eq!(report.lookup_micros, 500);
389    }
390
391    #[test]
392    fn cache_stats_returns_zero_for_missing_dir() {
393        let dir = tempfile::tempdir().unwrap();
394        let (count, bytes) = cycle_packet_cache_stats(dir.path());
395        assert_eq!(count, 0);
396        assert_eq!(bytes, 0);
397    }
398
399    #[test]
400    fn cache_stats_counts_entries() {
401        let dir = tempfile::tempdir().unwrap();
402        let root = dir.path();
403        cycle_packet_write_cache(
404            root,
405            CyclePacketKind::Evidence,
406            "key1",
407            &serde_json::json!({"test": 1}),
408        );
409        cycle_packet_write_cache(
410            root,
411            CyclePacketKind::Evidence,
412            "key2",
413            &serde_json::json!({"test": 2}),
414        );
415        cycle_packet_write_cache(
416            root,
417            CyclePacketKind::ContextPack,
418            "key3",
419            &serde_json::json!({"test": 3}),
420        );
421        let (count, bytes) = cycle_packet_cache_stats(root);
422        assert_eq!(count, 3);
423        assert!(bytes > 0);
424    }
425
426    #[test]
427    fn evict_removes_expired_entries() {
428        let dir = tempfile::tempdir().unwrap();
429        let root = dir.path();
430        let old_entry = serde_json::json!({"old": true});
431        cycle_packet_write_cache(root, CyclePacketKind::Evidence, "old-key", &old_entry);
432        let old_path = cycle_packet_cache_path(root, CyclePacketKind::Evidence, "old-key");
433        let old_time = std::time::SystemTime::now() - std::time::Duration::from_secs(7200);
434        let file_time = filetime::FileTime::from_system_time(old_time);
435        filetime::set_file_mtime(&old_path, file_time).unwrap();
436
437        let new_entry = serde_json::json!({"new": true});
438        cycle_packet_write_cache(root, CyclePacketKind::Evidence, "new-key", &new_entry);
439
440        let report = cycle_packet_cache_evict(root, 3600, 1024 * 1024 * 1024);
441        assert_eq!(report.evicted_entries, 1);
442        assert_eq!(report.remaining_entries, 1);
443        assert!(
444            cycle_packet_read_cache::<serde_json::Value>(
445                root,
446                CyclePacketKind::Evidence,
447                "old-key"
448            )
449            .is_none(),
450            "old entry should be evicted"
451        );
452        assert!(
453            cycle_packet_read_cache::<serde_json::Value>(
454                root,
455                CyclePacketKind::Evidence,
456                "new-key"
457            )
458            .is_some(),
459            "new entry should survive"
460        );
461    }
462
463    #[test]
464    fn evict_enforces_max_bytes() {
465        let dir = tempfile::tempdir().unwrap();
466        let root = dir.path();
467        for i in 0..5 {
468            let data = serde_json::json!({"payload": "x".repeat(200), "idx": i});
469            cycle_packet_write_cache(root, CyclePacketKind::Evidence, &format!("key-{i}"), &data);
470        }
471        let (count, bytes) = cycle_packet_cache_stats(root);
472        assert_eq!(count, 5);
473        assert!(bytes > 500);
474        let max_bytes = 500u64;
475        let report = cycle_packet_cache_evict(root, 0, max_bytes);
476        assert!(
477            report.evicted_entries > 0,
478            "expected evictions for max_bytes={max_bytes}, got {report:?}"
479        );
480        let (_, remaining_bytes) = cycle_packet_cache_stats(root);
481        assert!(
482            remaining_bytes <= max_bytes + 300,
483            "remaining bytes should be near max_bytes, got {remaining_bytes}"
484        );
485    }
486
487    #[test]
488    fn evict_noop_on_empty_cache() {
489        let dir = tempfile::tempdir().unwrap();
490        let report = cycle_packet_cache_evict(dir.path(), 3600, 1024);
491        assert_eq!(report.scanned_entries, 0);
492        assert_eq!(report.evicted_entries, 0);
493    }
494}