Skip to main content

lean_ctx/server/
compaction_sync.rs

1use std::path::Path;
2use std::sync::atomic::{AtomicU64, Ordering};
3
4use crate::core::cache::SessionCache;
5use crate::core::context_radar::RadarEvent;
6
7pub static LAST_COMPACTION_TS: AtomicU64 = AtomicU64::new(0);
8
9/// Effective cache policy: "aggressive" (default), "safe", or "off".
10pub fn effective_cache_policy() -> &'static str {
11    static POLICY: std::sync::OnceLock<String> = std::sync::OnceLock::new();
12    POLICY.get_or_init(|| {
13        if let Ok(v) = std::env::var("LEAN_CTX_CACHE_POLICY") {
14            let v = v.trim().to_lowercase();
15            if matches!(v.as_str(), "aggressive" | "safe" | "off") {
16                return v;
17            }
18        }
19        let cfg = crate::core::config::Config::load();
20        cfg.cache_policy
21            .as_deref()
22            .unwrap_or("aggressive")
23            .to_lowercase()
24    })
25}
26
27/// Check if a host compaction event occurred since our last check.
28/// If so, reset all `full_content_delivered` flags so the next read
29/// delivers full content instead of a stub.
30pub fn sync_if_compacted(cache: &mut SessionCache, data_dir: &Path) -> bool {
31    let last_seen = LAST_COMPACTION_TS.load(Ordering::Relaxed);
32    let radar_path = data_dir.join("context_radar.jsonl");
33
34    if !radar_path.exists() {
35        return false;
36    }
37
38    let Some(latest_compaction_ts) = find_latest_compaction(&radar_path, last_seen) else {
39        return false;
40    };
41
42    LAST_COMPACTION_TS.store(latest_compaction_ts, Ordering::Relaxed);
43    let reset_count = cache.reset_delivery_flags();
44    if reset_count > 0 {
45        eprintln!(
46            "[lean-ctx] compaction detected — reset {reset_count} delivery flags for re-read"
47        );
48    }
49    true
50}
51
52/// Scan only the tail of radar JSONL for a compaction event newer than `since_ts`.
53/// Reads at most 4KB from the end to avoid unbounded I/O on large radar files.
54fn find_latest_compaction(radar_path: &Path, since_ts: u64) -> Option<u64> {
55    use std::io::{Read, Seek, SeekFrom};
56
57    let mut file = std::fs::File::open(radar_path).ok()?;
58    let file_len = file.metadata().ok()?.len();
59
60    const TAIL_BYTES: u64 = 4096;
61    let content = if file_len <= TAIL_BYTES {
62        let mut s = String::new();
63        file.read_to_string(&mut s).ok()?;
64        s
65    } else {
66        file.seek(SeekFrom::End(-(TAIL_BYTES as i64))).ok()?;
67        let mut buf = vec![0u8; TAIL_BYTES as usize];
68        let n = file.read(&mut buf).ok()?;
69        let s = String::from_utf8_lossy(&buf[..n]).into_owned();
70        // Skip first partial line (we seeked into the middle of it)
71        if let Some(idx) = s.find('\n') {
72            s[idx + 1..].to_string()
73        } else {
74            s
75        }
76    };
77
78    for line in content.lines().rev() {
79        if line.is_empty() {
80            continue;
81        }
82        let event: RadarEvent = match serde_json::from_str(line) {
83            Ok(e) => e,
84            Err(_) => continue,
85        };
86        if event.ts <= since_ts {
87            break;
88        }
89        if event.event_type == "compaction" {
90            return Some(event.ts);
91        }
92    }
93    None
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use serial_test::serial;
100    use std::io::Write;
101    use tempfile::TempDir;
102
103    fn make_cache_with_delivered(paths: &[&str]) -> SessionCache {
104        let mut cache = SessionCache::default();
105        for p in paths {
106            cache.store(p, "hello world");
107            cache.mark_full_delivered(p);
108        }
109        cache
110    }
111
112    #[test]
113    #[serial]
114    fn no_reset_without_compaction_event() {
115        let dir = TempDir::new().unwrap();
116        let radar = dir.path().join("context_radar.jsonl");
117        let mut f = std::fs::File::create(&radar).unwrap();
118        writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
119        drop(f);
120
121        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
122        let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
123        assert!(!sync_if_compacted(&mut cache, dir.path()));
124        assert!(cache.is_full_delivered("/tmp/a.rs"));
125    }
126
127    #[test]
128    #[serial]
129    fn resets_after_compaction() {
130        let dir = TempDir::new().unwrap();
131        let radar = dir.path().join("context_radar.jsonl");
132        let mut f = std::fs::File::create(&radar).unwrap();
133        writeln!(f, r#"{{"ts":1000,"event_type":"mcp_call","tokens":50}}"#).unwrap();
134        writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
135        drop(f);
136
137        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
138        let mut cache = make_cache_with_delivered(&["/tmp/a.rs", "/tmp/b.rs"]);
139
140        assert!(cache.is_full_delivered("/tmp/a.rs"));
141        assert!(sync_if_compacted(&mut cache, dir.path()));
142        assert!(!cache.is_full_delivered("/tmp/a.rs"));
143        assert!(!cache.is_full_delivered("/tmp/b.rs"));
144    }
145
146    #[test]
147    #[serial]
148    fn does_not_double_reset() {
149        let dir = TempDir::new().unwrap();
150        let radar = dir.path().join("context_radar.jsonl");
151        let mut f = std::fs::File::create(&radar).unwrap();
152        writeln!(f, r#"{{"ts":2000,"event_type":"compaction","tokens":0}}"#).unwrap();
153        drop(f);
154
155        LAST_COMPACTION_TS.store(0, Ordering::Relaxed);
156        let mut cache = make_cache_with_delivered(&["/tmp/a.rs"]);
157        assert!(sync_if_compacted(&mut cache, dir.path()));
158        assert!(!cache.is_full_delivered("/tmp/a.rs"));
159
160        cache.mark_full_delivered("/tmp/a.rs");
161        assert!(!sync_if_compacted(&mut cache, dir.path()));
162        assert!(cache.is_full_delivered("/tmp/a.rs"));
163    }
164}