Skip to main content

lean_ctx/server/
compaction_sync.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::core::cache::SessionCache;
5use crate::core::context_radar::RadarEvent;
6
7pub static LAST_COMPACTION_TS: AtomicU64 = AtomicU64::new(0);
8
9/// Effective cache policy: "aggressive" (default), "safe", or "off".
10pub fn effective_cache_policy() -> &'static str {
11    static POLICY: std::sync::OnceLock<String> = std::sync::OnceLock::new();
12    POLICY.get_or_init(|| {
13        if let Ok(v) = std::env::var("LEAN_CTX_CACHE_POLICY") {
14            let v = v.trim().to_lowercase();
15            if matches!(v.as_str(), "aggressive" | "safe" | "off") {
16                return v;
17            }
18        }
19        let cfg = crate::core::config::Config::load();
20        cfg.cache_policy
21            .as_deref()
22            .unwrap_or("aggressive")
23            .to_lowercase()
24    })
25}
26
27/// Check if a host compaction event occurred since our last check.
28/// If so, reset all `full_content_delivered` flags so the next read
29/// delivers full content instead of a stub.
30pub fn sync_if_compacted(cache: &mut SessionCache, data_dir: &Path) -> bool {
31    let last_seen = LAST_COMPACTION_TS.load(Ordering::Relaxed);
32    let radar_path = data_dir.join("context_radar.jsonl");
33
34    if !radar_path.exists() {
35        return false;
36    }
37
38    let Some(latest_compaction_ts) = find_latest_compaction(&radar_path, last_seen) else {
39        return false;
40    };
41
42    LAST_COMPACTION_TS.store(latest_compaction_ts, Ordering::Relaxed);
43    let reset_count = cache.reset_delivery_flags();
44    if reset_count > 0 {
45        eprintln!(
46            "[lean-ctx] compaction detected — reset {reset_count} delivery flags for re-read"
47        );
48    }
49
50    std::thread::spawn(|| {
51        if let Some(session) = crate::core::session::SessionState::load_latest() {
52            if let Some(ref root) = session.project_root {
53                if !session.findings.is_empty() || !session.decisions.is_empty() {
54                    crate::tools::startup::auto_consolidate_knowledge(root);
55                }
56            }
57        }
58    });
59
60    true
61}
62
63/// Scan only the tail of radar JSONL for a compaction event newer than `since_ts`.
64/// Reads at most 4KB from the end to avoid unbounded I/O on large radar files.
65fn find_latest_compaction(radar_path: &Path, since_ts: u64) -> Option<u64> {
66    use std::io::{Read, Seek, SeekFrom};
67
68    let mut file = std::fs::File::open(radar_path).ok()?;
69    let file_len = file.metadata().ok()?.len();
70
71    const TAIL_BYTES: u64 = 4096;
72    let content = if file_len <= TAIL_BYTES {
73        let mut s = String::new();
74        file.read_to_string(&mut s).ok()?;
75        s
76    } else {
77        file.seek(SeekFrom::End(-(TAIL_BYTES as i64))).ok()?;
78        let mut buf = vec![0u8; TAIL_BYTES as usize];
79        let n = file.read(&mut buf).ok()?;
80        let s = String::from_utf8_lossy(&buf[..n]).into_owned();
81        // Skip first partial line (we seeked into the middle of it)
82        if let Some(idx) = s.find('\n') {
83            s[idx + 1..].to_string()
84        } else {
85            s
86        }
87    };
88
89    for line in content.lines().rev() {
90        if line.is_empty() {
91            continue;
92        }
93        let event: RadarEvent = match serde_json::from_str(line) {
94            Ok(e) => e,
95            Err(_) => continue,
96        };
97        if event.ts <= since_ts {
98            break;
99        }
100        if event.event_type == "compaction" {
101            return Some(event.ts);
102        }
103    }
104    None
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use serial_test::serial;
111    use std::io::Write;
112    use tempfile::TempDir;
113
114    fn make_cache_with_delivered(paths: &[&str]) -> SessionCache {
115        let mut cache = SessionCache::default();
116        for p in paths {
117            cache.store(p, "hello world");
118            cache.mark_full_delivered(p);
119        }
120        cache
121    }
122
123    #[test]
124    #[serial]
125    fn no_reset_without_compaction_event() {
126        let dir = TempDir::new().unwrap();
127        let radar = dir.path().join("context_radar.jsonl");
128        let mut f = std::fs::File::create(&radar).unwrap();
129        writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
130        drop(f);
131
132        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
133        let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
134        assert!(!sync_if_compacted(&mut cache, dir.path()));
135        assert!(cache.is_full_delivered("/tmp/a.rs"));
136    }
137
138    #[test]
139    #[serial]
140    fn resets_after_compaction() {
141        let dir = TempDir::new().unwrap();
142        let radar = dir.path().join("context_radar.jsonl");
143        let mut f = std::fs::File::create(&radar).unwrap();
144        writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
145        writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
146        drop(f);
147
148        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
149        let mut cache = make_cache_with_delivered(&["/tmp/a.rs", "/tmp/b.rs"]);
150
151        assert!(cache.is_full_delivered("/tmp/a.rs"));
152        assert!(sync_if_compacted(&mut cache, dir.path()));
153        assert!(!cache.is_full_delivered("/tmp/a.rs"));
154        assert!(!cache.is_full_delivered("/tmp/b.rs"));
155    }
156
157    #[test]
158    #[serial]
159    fn does_not_double_reset() {
160        let dir = TempDir::new().unwrap();
161        let radar = dir.path().join("context_radar.jsonl");
162        let mut f = std::fs::File::create(&radar).unwrap();
163        writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
164        drop(f);
165
166        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
167        let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
168        assert!(sync_if_compacted(&mut cache, dir.path()));
169        assert!(!cache.is_full_delivered("/tmp/a.rs"));
170
171        cache.mark_full_delivered("/tmp/a.rs");
172        assert!(!sync_if_compacted(&mut cache, dir.path()));
173        assert!(cache.is_full_delivered("/tmp/a.rs"));
174    }
175}