Skip to main content

lean_ctx/server/
compaction_sync.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::core::cache::SessionCache;
5use crate::core::context_radar::RadarEvent;
6
7pub static LAST_COMPACTION_TS: AtomicU64 = AtomicU64::new(0);
8
9/// Effective cache policy: "aggressive" (default), "safe", or "off".
10pub fn effective_cache_policy() -> &'static str {
11    static POLICY: std::sync::OnceLock<String> = std::sync::OnceLock::new();
12    POLICY.get_or_init(|| {
13        if let Ok(v) = std::env::var("LEAN_CTX_CACHE_POLICY") {
14            let v = v.trim().to_lowercase();
15            if matches!(v.as_str(), "aggressive" | "safe" | "off") {
16                return v;
17            }
18        }
19        let cfg = crate::core::config::Config::load();
20        cfg.cache_policy
21            .as_deref()
22            .unwrap_or("aggressive")
23            .to_lowercase()
24    })
25}
26
27/// Check if a host compaction event occurred since our last check.
28/// If so, reset all `full_content_delivered` flags so the next read
29/// delivers full content instead of a stub.
30pub fn sync_if_compacted(cache: &mut SessionCache, data_dir: &Path) -> bool {
31    let last_seen = LAST_COMPACTION_TS.load(Ordering::Relaxed);
32    let radar_path = data_dir.join("context_radar.jsonl");
33
34    if !radar_path.exists() {
35        return false;
36    }
37
38    let Some(latest_compaction_ts) = find_latest_compaction(&radar_path, last_seen) else {
39        return false;
40    };
41
42    LAST_COMPACTION_TS.store(latest_compaction_ts, Ordering::Relaxed);
43    crate::core::search_delta::reset();
44    let reset_count = cache.reset_delivery_flags();
45    if reset_count > 0 {
46        eprintln!(
47            "[lean-ctx] compaction detected — reset {reset_count} delivery flags for re-read"
48        );
49    }
50
51    std::thread::spawn(|| {
52        if let Some(session) = crate::core::session::SessionState::load_latest() {
53            if let Some(ref root) = session.project_root {
54                if !session.findings.is_empty() || !session.decisions.is_empty() {
55                    crate::tools::startup::auto_consolidate_knowledge(root);
56                }
57            }
58        }
59    });
60
61    true
62}
63
64/// Scan only the tail of radar JSONL for a compaction event newer than `since_ts`.
65/// Reads at most 4KB from the end to avoid unbounded I/O on large radar files.
66fn find_latest_compaction(radar_path: &Path, since_ts: u64) -> Option<u64> {
67    use std::io::{Read, Seek, SeekFrom};
68
69    let mut file = std::fs::File::open(radar_path).ok()?;
70    let file_len = file.metadata().ok()?.len();
71
72    const TAIL_BYTES: u64 = 4096;
73    let content = if file_len <= TAIL_BYTES {
74        let mut s = String::new();
75        file.read_to_string(&mut s).ok()?;
76        s
77    } else {
78        file.seek(SeekFrom::End(-(TAIL_BYTES as i64))).ok()?;
79        let mut buf = vec![0u8; TAIL_BYTES as usize];
80        let n = file.read(&mut buf).ok()?;
81        let s = String::from_utf8_lossy(&buf[..n]).into_owned();
82        // Skip first partial line (we seeked into the middle of it)
83        if let Some(idx) = s.find('\n') {
84            s[idx + 1..].to_string()
85        } else {
86            s
87        }
88    };
89
90    for line in content.lines().rev() {
91        if line.is_empty() {
92            continue;
93        }
94        let event: RadarEvent = match serde_json::from_str(line) {
95            Ok(e) => e,
96            Err(_) => continue,
97        };
98        if event.ts <= since_ts {
99            break;
100        }
101        if event.event_type == "compaction" {
102            return Some(event.ts);
103        }
104    }
105    None
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use serial_test::serial;
112    use std::io::Write;
113    use tempfile::TempDir;
114
115    fn make_cache_with_delivered(paths: &[&str]) -> SessionCache {
116        let mut cache = SessionCache::default();
117        for p in paths {
118            cache.store(p, "hello world");
119            cache.mark_full_delivered(p);
120        }
121        cache
122    }
123
124    #[test]
125    #[serial]
126    fn no_reset_without_compaction_event() {
127        let dir = TempDir::new().unwrap();
128        let radar = dir.path().join("context_radar.jsonl");
129        let mut f = std::fs::File::create(&radar).unwrap();
130        writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
131        drop(f);
132
133        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
134        let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
135        assert!(!sync_if_compacted(&mut cache, dir.path()));
136        assert!(cache.is_full_delivered("/tmp/a.rs"));
137    }
138
139    #[test]
140    #[serial]
141    fn resets_after_compaction() {
142        let dir = TempDir::new().unwrap();
143        let radar = dir.path().join("context_radar.jsonl");
144        let mut f = std::fs::File::create(&radar).unwrap();
145        writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
146        writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
147        drop(f);
148
149        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
150        let mut cache = make_cache_with_delivered(&["/tmp/a.rs", "/tmp/b.rs"]);
151
152        assert!(cache.is_full_delivered("/tmp/a.rs"));
153        assert!(sync_if_compacted(&mut cache, dir.path()));
154        assert!(!cache.is_full_delivered("/tmp/a.rs"));
155        assert!(!cache.is_full_delivered("/tmp/b.rs"));
156    }
157
158    #[test]
159    #[serial]
160    fn does_not_double_reset() {
161        let dir = TempDir::new().unwrap();
162        let radar = dir.path().join("context_radar.jsonl");
163        let mut f = std::fs::File::create(&radar).unwrap();
164        writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
165        drop(f);
166
167        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
168        let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
169        assert!(sync_if_compacted(&mut cache, dir.path()));
170        assert!(!cache.is_full_delivered("/tmp/a.rs"));
171
172        cache.mark_full_delivered("/tmp/a.rs");
173        assert!(!sync_if_compacted(&mut cache, dir.path()));
174        assert!(cache.is_full_delivered("/tmp/a.rs"));
175    }
176}