Skip to main content

lean_ctx/core/
events.rs

1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11    pub id: u64,
12    pub timestamp: String,
13    pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19    ToolCall {
20        tool: String,
21        tokens_original: u64,
22        tokens_saved: u64,
23        mode: Option<String>,
24        duration_ms: u64,
25        path: Option<String>,
26    },
27    CacheHit {
28        path: String,
29        saved_tokens: u64,
30    },
31    Compression {
32        path: String,
33        before_lines: u32,
34        after_lines: u32,
35        strategy: String,
36        kept_line_count: u32,
37        removed_line_count: u32,
38    },
39    AgentAction {
40        agent_id: String,
41        action: String,
42        tool: Option<String>,
43    },
44    KnowledgeUpdate {
45        category: String,
46        key: String,
47        action: String,
48    },
49    ThresholdShift {
50        language: String,
51        old_entropy: f64,
52        new_entropy: f64,
53        old_jaccard: f64,
54        new_jaccard: f64,
55    },
56}
57
58struct EventBus {
59    seq: AtomicU64,
60    ring: Mutex<VecDeque<LeanCtxEvent>>,
61}
62
63impl EventBus {
64    fn new() -> Self {
65        Self {
66            seq: AtomicU64::new(0),
67            ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
68        }
69    }
70
71    fn emit(&self, kind: EventKind) -> u64 {
72        let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
73        let event = LeanCtxEvent {
74            id,
75            timestamp: chrono::Local::now()
76                .format("%Y-%m-%dT%H:%M:%S%.3f")
77                .to_string(),
78            kind,
79        };
80
81        {
82            let mut ring = self
83                .ring
84                .lock()
85                .unwrap_or_else(std::sync::PoisonError::into_inner);
86            if ring.len() >= RING_CAPACITY {
87                ring.pop_front();
88            }
89            ring.push_back(event.clone());
90        }
91
92        append_jsonl(&event);
93        id
94    }
95
96    fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
97        let ring = self
98            .ring
99            .lock()
100            .unwrap_or_else(std::sync::PoisonError::into_inner);
101        ring.iter().filter(|e| e.id > after_id).cloned().collect()
102    }
103
104    fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
105        let ring = self
106            .ring
107            .lock()
108            .unwrap_or_else(std::sync::PoisonError::into_inner);
109        let len = ring.len();
110        let start = len.saturating_sub(n);
111        ring.iter().skip(start).cloned().collect()
112    }
113}
114
115fn bus() -> &'static EventBus {
116    static INSTANCE: OnceLock<EventBus> = OnceLock::new();
117    INSTANCE.get_or_init(EventBus::new)
118}
119
120fn jsonl_path() -> Option<std::path::PathBuf> {
121    crate::core::data_dir::lean_ctx_data_dir()
122        .ok()
123        .map(|d| d.join("events.jsonl"))
124}
125
126fn append_jsonl(event: &LeanCtxEvent) {
127    let Some(path) = jsonl_path() else { return };
128
129    if let Some(parent) = path.parent() {
130        let _ = std::fs::create_dir_all(parent);
131    }
132
133    if let Ok(content) = std::fs::read_to_string(&path) {
134        let lines = content.lines().count();
135        if lines >= JSONL_MAX_LINES {
136            let old = path.with_extension("jsonl.old");
137            let _ = std::fs::remove_file(&old);
138            let _ = std::fs::rename(&path, &old);
139        }
140    }
141
142    if let Ok(json) = serde_json::to_string(event) {
143        use std::io::Write;
144        if let Ok(mut f) = std::fs::OpenOptions::new()
145            .create(true)
146            .append(true)
147            .open(&path)
148        {
149            let _ = writeln!(f, "{json}");
150        }
151    }
152}
153
154// --- Public API ---
155
156pub fn emit(kind: EventKind) -> u64 {
157    bus().emit(kind)
158}
159
160pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
161    bus().events_since(after_id)
162}
163
164pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
165    bus().latest_events(n)
166}
167
168pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
169    let Some(path) = jsonl_path() else {
170        return Vec::new();
171    };
172    let Ok(content) = std::fs::read_to_string(&path) else {
173        return Vec::new();
174    };
175    let all: Vec<LeanCtxEvent> = content
176        .lines()
177        .filter(|l| !l.trim().is_empty())
178        .filter_map(|l| serde_json::from_str(l).ok())
179        .collect();
180    let start = all.len().saturating_sub(n);
181    all[start..].to_vec()
182}
183
184pub fn emit_tool_call(
185    tool: &str,
186    tokens_original: u64,
187    tokens_saved: u64,
188    mode: Option<String>,
189    duration_ms: u64,
190    path: Option<String>,
191) {
192    emit(EventKind::ToolCall {
193        tool: tool.to_string(),
194        tokens_original,
195        tokens_saved,
196        mode,
197        duration_ms,
198        path,
199    });
200}
201
202pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
203    emit(EventKind::CacheHit {
204        path: path.to_string(),
205        saved_tokens,
206    });
207}
208
209pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
210    emit(EventKind::AgentAction {
211        agent_id: agent_id.to_string(),
212        action: action.to_string(),
213        tool: tool.map(std::string::ToString::to_string),
214    });
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn emit_returns_positive_id() {
223        let id = emit(EventKind::ToolCall {
224            tool: "ctx_read".to_string(),
225            tokens_original: 1000,
226            tokens_saved: 800,
227            mode: Some("map".to_string()),
228            duration_ms: 5,
229            path: Some("src/main.rs".to_string()),
230        });
231        assert!(id > 0);
232        let events = latest_events(100);
233        assert!(events.iter().any(|e| e.id == id));
234    }
235
236    #[test]
237    fn events_since_filters_correctly() {
238        let id1 = emit(EventKind::CacheHit {
239            path: "filter_test_a.rs".to_string(),
240            saved_tokens: 100,
241        });
242        let id2 = emit(EventKind::CacheHit {
243            path: "filter_test_b.rs".to_string(),
244            saved_tokens: 200,
245        });
246
247        let after = events_since(id1);
248        assert!(after.iter().any(|e| e.id == id2));
249        assert!(after.iter().all(|e| e.id > id1));
250    }
251}