lean_ctx/server/
compaction_sync.rs1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::core::cache::SessionCache;
5use crate::core::context_radar::RadarEvent;
6
7pub static LAST_COMPACTION_TS: AtomicU64 = AtomicU64::new(0);
8
9pub fn effective_cache_policy() -> &'static str {
11 static POLICY: std::sync::OnceLock<String> = std::sync::OnceLock::new();
12 POLICY.get_or_init(|| {
13 if let Ok(v) = std::env::var("LEAN_CTX_CACHE_POLICY") {
14 let v = v.trim().to_lowercase();
15 if matches!(v.as_str(), "aggressive" | "safe" | "off") {
16 return v;
17 }
18 }
19 let cfg = crate::core::config::Config::load();
20 cfg.cache_policy
21 .as_deref()
22 .unwrap_or("aggressive")
23 .to_lowercase()
24 })
25}
26
27pub fn sync_if_compacted(cache: &mut SessionCache, data_dir: &Path) -> bool {
31 let last_seen = LAST_COMPACTION_TS.load(Ordering::Relaxed);
32 let radar_path = data_dir.join("context_radar.jsonl");
33
34 if !radar_path.exists() {
35 return false;
36 }
37
38 let Some(latest_compaction_ts) = find_latest_compaction(&radar_path, last_seen) else {
39 return false;
40 };
41
42 LAST_COMPACTION_TS.store(latest_compaction_ts, Ordering::Relaxed);
43 let reset_count = cache.reset_delivery_flags();
44 if reset_count > 0 {
45 eprintln!(
46 "[lean-ctx] compaction detected — reset {reset_count} delivery flags for re-read"
47 );
48 }
49
50 std::thread::spawn(|| {
51 if let Some(session) = crate::core::session::SessionState::load_latest() {
52 if let Some(ref root) = session.project_root {
53 if !session.findings.is_empty() || !session.decisions.is_empty() {
54 crate::tools::startup::auto_consolidate_knowledge(root);
55 }
56 }
57 }
58 });
59
60 true
61}
62
63fn find_latest_compaction(radar_path: &Path, since_ts: u64) -> Option<u64> {
66 use std::io::{Read, Seek, SeekFrom};
67
68 let mut file = std::fs::File::open(radar_path).ok()?;
69 let file_len = file.metadata().ok()?.len();
70
71 const TAIL_BYTES: u64 = 4096;
72 let content = if file_len <= TAIL_BYTES {
73 let mut s = String::new();
74 file.read_to_string(&mut s).ok()?;
75 s
76 } else {
77 file.seek(SeekFrom::End(-(TAIL_BYTES as i64))).ok()?;
78 let mut buf = vec![0u8; TAIL_BYTES as usize];
79 let n = file.read(&mut buf).ok()?;
80 let s = String::from_utf8_lossy(&buf[..n]).into_owned();
81 if let Some(idx) = s.find('\n') {
83 s[idx + 1..].to_string()
84 } else {
85 s
86 }
87 };
88
89 for line in content.lines().rev() {
90 if line.is_empty() {
91 continue;
92 }
93 let event: RadarEvent = match serde_json::from_str(line) {
94 Ok(e) => e,
95 Err(_) => continue,
96 };
97 if event.ts <= since_ts {
98 break;
99 }
100 if event.event_type == "compaction" {
101 return Some(event.ts);
102 }
103 }
104 None
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110 use serial_test::serial;
111 use std::io::Write;
112 use tempfile::TempDir;
113
114 fn make_cache_with_delivered(paths: &[&str]) -> SessionCache {
115 let mut cache = SessionCache::default();
116 for p in paths {
117 cache.store(p, "hello world");
118 cache.mark_full_delivered(p);
119 }
120 cache
121 }
122
123 #[test]
124 #[serial]
125 fn no_reset_without_compaction_event() {
126 let dir = TempDir::new().unwrap();
127 let radar = dir.path().join("context_radar.jsonl");
128 let mut f = std::fs::File::create(&radar).unwrap();
129 writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
130 drop(f);
131
132 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
133 let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
134 assert!(!sync_if_compacted(&mut cache, dir.path()));
135 assert!(cache.is_full_delivered("/tmp/a.rs"));
136 }
137
138 #[test]
139 #[serial]
140 fn resets_after_compaction() {
141 let dir = TempDir::new().unwrap();
142 let radar = dir.path().join("context_radar.jsonl");
143 let mut f = std::fs::File::create(&radar).unwrap();
144 writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
145 writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
146 drop(f);
147
148 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
149 let mut cache = make_cache_with_delivered(&["/tmp/a.rs", "/tmp/b.rs"]);
150
151 assert!(cache.is_full_delivered("/tmp/a.rs"));
152 assert!(sync_if_compacted(&mut cache, dir.path()));
153 assert!(!cache.is_full_delivered("/tmp/a.rs"));
154 assert!(!cache.is_full_delivered("/tmp/b.rs"));
155 }
156
157 #[test]
158 #[serial]
159 fn does_not_double_reset() {
160 let dir = TempDir::new().unwrap();
161 let radar = dir.path().join("context_radar.jsonl");
162 let mut f = std::fs::File::create(&radar).unwrap();
163 writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
164 drop(f);
165
166 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
167 let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
168 assert!(sync_if_compacted(&mut cache, dir.path()));
169 assert!(!cache.is_full_delivered("/tmp/a.rs"));
170
171 cache.mark_full_delivered("/tmp/a.rs");
172 assert!(!sync_if_compacted(&mut cache, dir.path()));
173 assert!(cache.is_full_delivered("/tmp/a.rs"));
174 }
175}