1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11 pub id: u64,
12 pub timestamp: String,
13 pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19 ToolCall {
20 tool: String,
21 tokens_original: u64,
22 tokens_saved: u64,
23 mode: Option<String>,
24 duration_ms: u64,
25 path: Option<String>,
26 },
27 CacheHit {
28 path: String,
29 saved_tokens: u64,
30 },
31 Compression {
32 path: String,
33 before_lines: u32,
34 after_lines: u32,
35 strategy: String,
36 kept_line_count: u32,
37 removed_line_count: u32,
38 },
39 AgentAction {
40 agent_id: String,
41 action: String,
42 tool: Option<String>,
43 },
44 KnowledgeUpdate {
45 category: String,
46 key: String,
47 action: String,
48 },
49 ThresholdShift {
50 language: String,
51 old_entropy: f64,
52 new_entropy: f64,
53 old_jaccard: f64,
54 new_jaccard: f64,
55 },
56 BudgetWarning {
57 role: String,
58 dimension: String,
59 used: String,
60 limit: String,
61 percent: u8,
62 },
63 BudgetExhausted {
64 role: String,
65 dimension: String,
66 used: String,
67 limit: String,
68 },
69 PolicyViolation {
70 role: String,
71 tool: String,
72 reason: String,
73 },
74 RoleChanged {
75 from: String,
76 to: String,
77 },
78 ProfileChanged {
79 from: String,
80 to: String,
81 },
82 SloViolation {
83 slo_name: String,
84 metric: String,
85 threshold: f64,
86 actual: f64,
87 action: String,
88 },
89 Anomaly {
90 metric: String,
91 expected: f64,
92 actual: f64,
93 deviation_factor: f64,
94 },
95 VerificationWarning {
96 warning_kind: String,
97 detail: String,
98 severity: String,
99 },
100 ThresholdAdapted {
101 language: String,
102 arm: String,
103 old_threshold: f64,
104 new_threshold: f64,
105 },
106}
107
108struct EventBus {
109 seq: AtomicU64,
110 ring: Mutex<VecDeque<LeanCtxEvent>>,
111}
112
113impl EventBus {
114 fn new() -> Self {
115 Self {
116 seq: AtomicU64::new(0),
117 ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
118 }
119 }
120
121 fn emit(&self, kind: EventKind) -> u64 {
122 let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
123 let event = LeanCtxEvent {
124 id,
125 timestamp: chrono::Local::now()
126 .format("%Y-%m-%dT%H:%M:%S%.3f")
127 .to_string(),
128 kind,
129 };
130
131 {
132 let mut ring = self
133 .ring
134 .lock()
135 .unwrap_or_else(std::sync::PoisonError::into_inner);
136 if ring.len() >= RING_CAPACITY {
137 ring.pop_front();
138 }
139 ring.push_back(event.clone());
140 }
141
142 append_jsonl(&event);
143 id
144 }
145
146 fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
147 let ring = self
148 .ring
149 .lock()
150 .unwrap_or_else(std::sync::PoisonError::into_inner);
151 ring.iter().filter(|e| e.id > after_id).cloned().collect()
152 }
153
154 fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
155 let ring = self
156 .ring
157 .lock()
158 .unwrap_or_else(std::sync::PoisonError::into_inner);
159 let len = ring.len();
160 let start = len.saturating_sub(n);
161 ring.iter().skip(start).cloned().collect()
162 }
163}
164
165fn bus() -> &'static EventBus {
166 static INSTANCE: OnceLock<EventBus> = OnceLock::new();
167 INSTANCE.get_or_init(EventBus::new)
168}
169
170fn jsonl_path() -> Option<std::path::PathBuf> {
171 crate::core::data_dir::lean_ctx_data_dir()
172 .ok()
173 .map(|d| d.join("events.jsonl"))
174}
175
176fn is_test_environment() -> bool {
177 use std::sync::OnceLock;
178 static CACHED: OnceLock<bool> = OnceLock::new();
179 *CACHED.get_or_init(|| {
180 if cfg!(test) {
181 return true;
182 }
183 if std::env::var_os("__LEAN_CTX_SKIP_EVENTS").is_some() {
184 return true;
185 }
186 std::env::current_exe().is_ok_and(|p| {
187 let s = p.to_string_lossy();
188 s.contains("/deps/") || s.contains("\\deps\\")
189 })
190 })
191}
192
193fn append_jsonl(event: &LeanCtxEvent) {
194 if is_test_environment() {
195 return;
196 }
197 let Some(path) = jsonl_path() else { return };
198
199 if let Some(parent) = path.parent() {
200 let _ = std::fs::create_dir_all(parent);
201 }
202
203 if let Ok(content) = std::fs::read_to_string(&path) {
204 let lines = content.lines().count();
205 if lines >= JSONL_MAX_LINES {
206 let old = path.with_extension("jsonl.old");
207 let _ = std::fs::remove_file(&old);
208 let _ = std::fs::rename(&path, &old);
209 }
210 }
211
212 if let Ok(json) = serde_json::to_string(event) {
213 use std::io::Write;
214 if let Ok(mut f) = std::fs::OpenOptions::new()
215 .create(true)
216 .append(true)
217 .open(&path)
218 {
219 let _ = writeln!(f, "{json}");
220 }
221 }
222}
223
224pub fn emit(kind: EventKind) -> u64 {
227 bus().emit(kind)
228}
229
230pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
231 bus().events_since(after_id)
232}
233
234pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
235 bus().latest_events(n)
236}
237
238pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
239 let Some(path) = jsonl_path() else {
240 return Vec::new();
241 };
242 let Ok(content) = std::fs::read_to_string(&path) else {
243 return Vec::new();
244 };
245 let all: Vec<LeanCtxEvent> = content
246 .lines()
247 .filter(|l| !l.trim().is_empty())
248 .filter_map(|l| serde_json::from_str(l).ok())
249 .collect();
250 let start = all.len().saturating_sub(n);
251 all[start..].to_vec()
252}
253
254pub fn emit_tool_call(
255 tool: &str,
256 tokens_original: u64,
257 tokens_saved: u64,
258 mode: Option<String>,
259 duration_ms: u64,
260 path: Option<String>,
261) {
262 emit(EventKind::ToolCall {
263 tool: tool.to_string(),
264 tokens_original,
265 tokens_saved,
266 mode,
267 duration_ms,
268 path,
269 });
270}
271
272pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
273 emit(EventKind::CacheHit {
274 path: path.to_string(),
275 saved_tokens,
276 });
277}
278
279pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
280 emit(EventKind::AgentAction {
281 agent_id: agent_id.to_string(),
282 action: action.to_string(),
283 tool: tool.map(std::string::ToString::to_string),
284 });
285}
286
287pub fn emit_budget_warning(role: &str, dimension: &str, used: &str, limit: &str, percent: u8) {
288 emit(EventKind::BudgetWarning {
289 role: role.to_string(),
290 dimension: dimension.to_string(),
291 used: used.to_string(),
292 limit: limit.to_string(),
293 percent,
294 });
295}
296
297pub fn emit_budget_exhausted(role: &str, dimension: &str, used: &str, limit: &str) {
298 emit(EventKind::BudgetExhausted {
299 role: role.to_string(),
300 dimension: dimension.to_string(),
301 used: used.to_string(),
302 limit: limit.to_string(),
303 });
304}
305
306pub fn emit_policy_violation(role: &str, tool: &str, reason: &str) {
307 emit(EventKind::PolicyViolation {
308 role: role.to_string(),
309 tool: tool.to_string(),
310 reason: reason.to_string(),
311 });
312}
313
314pub fn emit_role_changed(from: &str, to: &str) {
315 emit(EventKind::RoleChanged {
316 from: from.to_string(),
317 to: to.to_string(),
318 });
319}
320
321pub fn emit_profile_changed(from: &str, to: &str) {
322 emit(EventKind::ProfileChanged {
323 from: from.to_string(),
324 to: to.to_string(),
325 });
326}
327
328pub fn emit_slo_violation(slo_name: &str, metric: &str, threshold: f64, actual: f64, action: &str) {
329 emit(EventKind::SloViolation {
330 slo_name: slo_name.to_string(),
331 metric: metric.to_string(),
332 threshold,
333 actual,
334 action: action.to_string(),
335 });
336}
337
338pub fn emit_anomaly(metric: &str, expected: f64, actual: f64, deviation_factor: f64) {
339 emit(EventKind::Anomaly {
340 metric: metric.to_string(),
341 expected,
342 actual,
343 deviation_factor,
344 });
345}
346
347pub fn emit_verification_warning(warning_kind: &str, detail: &str, severity: &str) {
348 emit(EventKind::VerificationWarning {
349 warning_kind: warning_kind.to_string(),
350 detail: detail.to_string(),
351 severity: severity.to_string(),
352 });
353}
354
355pub fn emit_threshold_adapted(language: &str, arm: &str, old_threshold: f64, new_threshold: f64) {
356 emit(EventKind::ThresholdAdapted {
357 language: language.to_string(),
358 arm: arm.to_string(),
359 old_threshold,
360 new_threshold,
361 });
362}
363
364#[cfg(test)]
365mod tests {
366 use super::*;
367
368 #[test]
369 fn emit_returns_positive_id() {
370 let id = emit(EventKind::ToolCall {
371 tool: "ctx_read".to_string(),
372 tokens_original: 1000,
373 tokens_saved: 800,
374 mode: Some("map".to_string()),
375 duration_ms: 5,
376 path: Some("src/main.rs".to_string()),
377 });
378 assert!(id > 0);
379 let events = latest_events(100);
380 assert!(events.iter().any(|e| e.id == id));
381 }
382
383 #[test]
384 fn events_since_filters_correctly() {
385 let id1 = emit(EventKind::CacheHit {
386 path: "filter_test_a.rs".to_string(),
387 saved_tokens: 100,
388 });
389 let id2 = emit(EventKind::CacheHit {
390 path: "filter_test_b.rs".to_string(),
391 saved_tokens: 200,
392 });
393
394 let after = events_since(id1);
395 assert!(after.iter().any(|e| e.id == id2));
396 assert!(after.iter().all(|e| e.id > id1));
397 }
398}