1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11 pub id: u64,
12 pub timestamp: String,
13 pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19 ToolCall {
20 tool: String,
21 tokens_original: u64,
22 tokens_saved: u64,
23 mode: Option<String>,
24 duration_ms: u64,
25 path: Option<String>,
26 },
27 CacheHit {
28 path: String,
29 saved_tokens: u64,
30 },
31 Compression {
32 path: String,
33 before_lines: u32,
34 after_lines: u32,
35 strategy: String,
36 kept_line_count: u32,
37 removed_line_count: u32,
38 },
39 AgentAction {
40 agent_id: String,
41 action: String,
42 tool: Option<String>,
43 },
44 KnowledgeUpdate {
45 category: String,
46 key: String,
47 action: String,
48 },
49 ThresholdShift {
50 language: String,
51 old_entropy: f64,
52 new_entropy: f64,
53 old_jaccard: f64,
54 new_jaccard: f64,
55 },
56}
57
58struct EventBus {
59 seq: AtomicU64,
60 ring: Mutex<VecDeque<LeanCtxEvent>>,
61}
62
63impl EventBus {
64 fn new() -> Self {
65 Self {
66 seq: AtomicU64::new(0),
67 ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
68 }
69 }
70
71 fn emit(&self, kind: EventKind) -> u64 {
72 let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
73 let event = LeanCtxEvent {
74 id,
75 timestamp: chrono::Local::now()
76 .format("%Y-%m-%dT%H:%M:%S%.3f")
77 .to_string(),
78 kind,
79 };
80
81 {
82 let mut ring = self.ring.lock().unwrap_or_else(|e| e.into_inner());
83 if ring.len() >= RING_CAPACITY {
84 ring.pop_front();
85 }
86 ring.push_back(event.clone());
87 }
88
89 append_jsonl(&event);
90 id
91 }
92
93 fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
94 let ring = self.ring.lock().unwrap_or_else(|e| e.into_inner());
95 ring.iter().filter(|e| e.id > after_id).cloned().collect()
96 }
97
98 fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
99 let ring = self.ring.lock().unwrap_or_else(|e| e.into_inner());
100 let len = ring.len();
101 let start = len.saturating_sub(n);
102 ring.iter().skip(start).cloned().collect()
103 }
104}
105
106fn bus() -> &'static EventBus {
107 static INSTANCE: OnceLock<EventBus> = OnceLock::new();
108 INSTANCE.get_or_init(EventBus::new)
109}
110
111fn jsonl_path() -> Option<std::path::PathBuf> {
112 dirs::home_dir().map(|h| h.join(".lean-ctx").join("events.jsonl"))
113}
114
115fn append_jsonl(event: &LeanCtxEvent) {
116 let Some(path) = jsonl_path() else { return };
117
118 if let Some(parent) = path.parent() {
119 let _ = std::fs::create_dir_all(parent);
120 }
121
122 if let Ok(content) = std::fs::read_to_string(&path) {
123 let lines = content.lines().count();
124 if lines >= JSONL_MAX_LINES {
125 let old = path.with_extension("jsonl.old");
126 let _ = std::fs::remove_file(&old);
127 let _ = std::fs::rename(&path, &old);
128 }
129 }
130
131 if let Ok(json) = serde_json::to_string(event) {
132 use std::io::Write;
133 if let Ok(mut f) = std::fs::OpenOptions::new()
134 .create(true)
135 .append(true)
136 .open(&path)
137 {
138 let _ = writeln!(f, "{json}");
139 }
140 }
141}
142
143pub fn emit(kind: EventKind) -> u64 {
146 bus().emit(kind)
147}
148
149pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
150 bus().events_since(after_id)
151}
152
153pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
154 bus().latest_events(n)
155}
156
157pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
158 let Some(path) = jsonl_path() else {
159 return Vec::new();
160 };
161 let Ok(content) = std::fs::read_to_string(&path) else {
162 return Vec::new();
163 };
164 let all: Vec<LeanCtxEvent> = content
165 .lines()
166 .filter(|l| !l.trim().is_empty())
167 .filter_map(|l| serde_json::from_str(l).ok())
168 .collect();
169 let start = all.len().saturating_sub(n);
170 all[start..].to_vec()
171}
172
173pub fn emit_tool_call(
174 tool: &str,
175 tokens_original: u64,
176 tokens_saved: u64,
177 mode: Option<String>,
178 duration_ms: u64,
179 path: Option<String>,
180) {
181 emit(EventKind::ToolCall {
182 tool: tool.to_string(),
183 tokens_original,
184 tokens_saved,
185 mode,
186 duration_ms,
187 path,
188 });
189}
190
191pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
192 emit(EventKind::CacheHit {
193 path: path.to_string(),
194 saved_tokens,
195 });
196}
197
198pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
199 emit(EventKind::AgentAction {
200 agent_id: agent_id.to_string(),
201 action: action.to_string(),
202 tool: tool.map(|t| t.to_string()),
203 });
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 fn emit_returns_positive_id() {
212 let id = emit(EventKind::ToolCall {
213 tool: "ctx_read".to_string(),
214 tokens_original: 1000,
215 tokens_saved: 800,
216 mode: Some("map".to_string()),
217 duration_ms: 5,
218 path: Some("src/main.rs".to_string()),
219 });
220 assert!(id > 0);
221 let events = latest_events(100);
222 assert!(events.iter().any(|e| e.id == id));
223 }
224
225 #[test]
226 fn events_since_filters_correctly() {
227 let id1 = emit(EventKind::CacheHit {
228 path: "filter_test_a.rs".to_string(),
229 saved_tokens: 100,
230 });
231 let id2 = emit(EventKind::CacheHit {
232 path: "filter_test_b.rs".to_string(),
233 saved_tokens: 200,
234 });
235
236 let after = events_since(id1);
237 assert!(after.iter().any(|e| e.id == id2));
238 assert!(after.iter().all(|e| e.id > id1));
239 }
240}