lean_ctx/server/
compaction_sync.rs1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::core::cache::SessionCache;
5use crate::core::context_radar::RadarEvent;
6
7pub static LAST_COMPACTION_TS: AtomicU64 = AtomicU64::new(0);
8
9pub fn effective_cache_policy() -> &'static str {
11 static POLICY: std::sync::OnceLock<String> = std::sync::OnceLock::new();
12 POLICY.get_or_init(|| {
13 if let Ok(v) = std::env::var("LEAN_CTX_CACHE_POLICY") {
14 let v = v.trim().to_lowercase();
15 if matches!(v.as_str(), "aggressive" | "safe" | "off") {
16 return v;
17 }
18 }
19 let cfg = crate::core::config::Config::load();
20 cfg.cache_policy
21 .as_deref()
22 .unwrap_or("aggressive")
23 .to_lowercase()
24 })
25}
26
27pub fn sync_if_compacted(cache: &mut SessionCache, data_dir: &Path) -> bool {
31 let last_seen = LAST_COMPACTION_TS.load(Ordering::Relaxed);
32 let radar_path = data_dir.join("context_radar.jsonl");
33
34 if !radar_path.exists() {
35 return false;
36 }
37
38 let Some(latest_compaction_ts) = find_latest_compaction(&radar_path, last_seen) else {
39 return false;
40 };
41
42 LAST_COMPACTION_TS.store(latest_compaction_ts, Ordering::Relaxed);
43 let reset_count = cache.reset_delivery_flags();
44 if reset_count > 0 {
45 eprintln!(
46 "[lean-ctx] compaction detected — reset {reset_count} delivery flags for re-read"
47 );
48 }
49 true
50}
51
52fn find_latest_compaction(radar_path: &Path, since_ts: u64) -> Option<u64> {
55 use std::io::{Read, Seek, SeekFrom};
56
57 let mut file = std::fs::File::open(radar_path).ok()?;
58 let file_len = file.metadata().ok()?.len();
59
60 const TAIL_BYTES: u64 = 4096;
61 let content = if file_len <= TAIL_BYTES {
62 let mut s = String::new();
63 file.read_to_string(&mut s).ok()?;
64 s
65 } else {
66 file.seek(SeekFrom::End(-(TAIL_BYTES as i64))).ok()?;
67 let mut buf = vec![0u8; TAIL_BYTES as usize];
68 let n = file.read(&mut buf).ok()?;
69 let s = String::from_utf8_lossy(&buf[..n]).into_owned();
70 if let Some(idx) = s.find('\n') {
72 s[idx + 1..].to_string()
73 } else {
74 s
75 }
76 };
77
78 for line in content.lines().rev() {
79 if line.is_empty() {
80 continue;
81 }
82 let event: RadarEvent = match serde_json::from_str(line) {
83 Ok(e) => e,
84 Err(_) => continue,
85 };
86 if event.ts <= since_ts {
87 break;
88 }
89 if event.event_type == "compaction" {
90 return Some(event.ts);
91 }
92 }
93 None
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use serial_test::serial;
100 use std::io::Write;
101 use tempfile::TempDir;
102
103 fn make_cache_with_delivered(paths: &[&str]) -> SessionCache {
104 let mut cache = SessionCache::default();
105 for p in paths {
106 cache.store(p, "hello world");
107 cache.mark_full_delivered(p);
108 }
109 cache
110 }
111
112 #[test]
113 #[serial]
114 fn no_reset_without_compaction_event() {
115 let dir = TempDir::new().unwrap();
116 let radar = dir.path().join("context_radar.jsonl");
117 let mut f = std::fs::File::create(&radar).unwrap();
118 writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
119 drop(f);
120
121 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
122 let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
123 assert!(!sync_if_compacted(&mut cache, dir.path()));
124 assert!(cache.is_full_delivered("/tmp/a.rs"));
125 }
126
127 #[test]
128 #[serial]
129 fn resets_after_compaction() {
130 let dir = TempDir::new().unwrap();
131 let radar = dir.path().join("context_radar.jsonl");
132 let mut f = std::fs::File::create(&radar).unwrap();
133 writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
134 writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
135 drop(f);
136
137 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
138 let mut cache = make_cache_with_delivered(&["/tmp/a.rs", "/tmp/b.rs"]);
139
140 assert!(cache.is_full_delivered("/tmp/a.rs"));
141 assert!(sync_if_compacted(&mut cache, dir.path()));
142 assert!(!cache.is_full_delivered("/tmp/a.rs"));
143 assert!(!cache.is_full_delivered("/tmp/b.rs"));
144 }
145
146 #[test]
147 #[serial]
148 fn does_not_double_reset() {
149 let dir = TempDir::new().unwrap();
150 let radar = dir.path().join("context_radar.jsonl");
151 let mut f = std::fs::File::create(&radar).unwrap();
152 writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
153 drop(f);
154
155 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
156 let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
157 assert!(sync_if_compacted(&mut cache, dir.path()));
158 assert!(!cache.is_full_delivered("/tmp/a.rs"));
159
160 cache.mark_full_delivered("/tmp/a.rs");
161 assert!(!sync_if_compacted(&mut cache, dir.path()));
162 assert!(cache.is_full_delivered("/tmp/a.rs"));
163 }
164}