lean_ctx/server/
compaction_sync.rs1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::core::cache::SessionCache;
5use crate::core::context_radar::RadarEvent;
6
7pub static LAST_COMPACTION_TS: AtomicU64 = AtomicU64::new(0);
8
9pub fn effective_cache_policy() -> &'static str {
11 static POLICY: std::sync::OnceLock<String> = std::sync::OnceLock::new();
12 POLICY.get_or_init(|| {
13 if let Ok(v) = std::env::var("LEAN_CTX_CACHE_POLICY") {
14 let v = v.trim().to_lowercase();
15 if matches!(v.as_str(), "aggressive" | "safe" | "off") {
16 return v;
17 }
18 }
19 let cfg = crate::core::config::Config::load();
20 cfg.cache_policy
21 .as_deref()
22 .unwrap_or("aggressive")
23 .to_lowercase()
24 })
25}
26
27pub fn sync_if_compacted(cache: &mut SessionCache, data_dir: &Path) -> bool {
31 let last_seen = LAST_COMPACTION_TS.load(Ordering::Relaxed);
32 let radar_path = data_dir.join("context_radar.jsonl");
33
34 if !radar_path.exists() {
35 return false;
36 }
37
38 let Some(latest_compaction_ts) = find_latest_compaction(&radar_path, last_seen) else {
39 return false;
40 };
41
42 LAST_COMPACTION_TS.store(latest_compaction_ts, Ordering::Relaxed);
43 crate::core::search_delta::reset();
44 let reset_count = cache.reset_delivery_flags();
45 if reset_count > 0 {
46 eprintln!(
47 "[lean-ctx] compaction detected — reset {reset_count} delivery flags for re-read"
48 );
49 }
50
51 std::thread::spawn(|| {
52 if let Some(session) = crate::core::session::SessionState::load_latest() {
53 if let Some(ref root) = session.project_root {
54 if !session.findings.is_empty() || !session.decisions.is_empty() {
55 crate::tools::startup::auto_consolidate_knowledge(root);
56 }
57 }
58 }
59 });
60
61 true
62}
63
64fn find_latest_compaction(radar_path: &Path, since_ts: u64) -> Option<u64> {
67 use std::io::{Read, Seek, SeekFrom};
68
69 let mut file = std::fs::File::open(radar_path).ok()?;
70 let file_len = file.metadata().ok()?.len();
71
72 const TAIL_BYTES: u64 = 4096;
73 let content = if file_len <= TAIL_BYTES {
74 let mut s = String::new();
75 file.read_to_string(&mut s).ok()?;
76 s
77 } else {
78 file.seek(SeekFrom::End(-(TAIL_BYTES as i64))).ok()?;
79 let mut buf = vec![0u8; TAIL_BYTES as usize];
80 let n = file.read(&mut buf).ok()?;
81 let s = String::from_utf8_lossy(&buf[..n]).into_owned();
82 if let Some(idx) = s.find('\n') {
84 s[idx + 1..].to_string()
85 } else {
86 s
87 }
88 };
89
90 for line in content.lines().rev() {
91 if line.is_empty() {
92 continue;
93 }
94 let event: RadarEvent = match serde_json::from_str(line) {
95 Ok(e) => e,
96 Err(_) => continue,
97 };
98 if event.ts <= since_ts {
99 break;
100 }
101 if event.event_type == "compaction" {
102 return Some(event.ts);
103 }
104 }
105 None
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use serial_test::serial;
112 use std::io::Write;
113 use tempfile::TempDir;
114
115 fn make_cache_with_delivered(paths: &[&str]) -> SessionCache {
116 let mut cache = SessionCache::default();
117 for p in paths {
118 cache.store(p, "hello world");
119 cache.mark_full_delivered(p);
120 }
121 cache
122 }
123
124 #[test]
125 #[serial]
126 fn no_reset_without_compaction_event() {
127 let dir = TempDir::new().unwrap();
128 let radar = dir.path().join("context_radar.jsonl");
129 let mut f = std::fs::File::create(&radar).unwrap();
130 writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
131 drop(f);
132
133 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
134 let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
135 assert!(!sync_if_compacted(&mut cache, dir.path()));
136 assert!(cache.is_full_delivered("/tmp/a.rs"));
137 }
138
139 #[test]
140 #[serial]
141 fn resets_after_compaction() {
142 let dir = TempDir::new().unwrap();
143 let radar = dir.path().join("context_radar.jsonl");
144 let mut f = std::fs::File::create(&radar).unwrap();
145 writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
146 writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
147 drop(f);
148
149 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
150 let mut cache = make_cache_with_delivered(&["/tmp/a.rs", "/tmp/b.rs"]);
151
152 assert!(cache.is_full_delivered("/tmp/a.rs"));
153 assert!(sync_if_compacted(&mut cache, dir.path()));
154 assert!(!cache.is_full_delivered("/tmp/a.rs"));
155 assert!(!cache.is_full_delivered("/tmp/b.rs"));
156 }
157
158 #[test]
159 #[serial]
160 fn does_not_double_reset() {
161 let dir = TempDir::new().unwrap();
162 let radar = dir.path().join("context_radar.jsonl");
163 let mut f = std::fs::File::create(&radar).unwrap();
164 writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
165 drop(f);
166
167 LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
168 let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
169 assert!(sync_if_compacted(&mut cache, dir.path()));
170 assert!(!cache.is_full_delivered("/tmp/a.rs"));
171
172 cache.mark_full_delivered("/tmp/a.rs");
173 assert!(!sync_if_compacted(&mut cache, dir.path()));
174 assert!(cache.is_full_delivered("/tmp/a.rs"));
175 }
176}