1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11 pub id: u64,
12 pub timestamp: String,
13 pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19 ToolCall {
20 tool: String,
21 tokens_original: u64,
22 tokens_saved: u64,
23 mode: Option<String>,
24 duration_ms: u64,
25 path: Option<String>,
26 },
27 CacheHit {
28 path: String,
29 saved_tokens: u64,
30 },
31 Compression {
32 path: String,
33 before_lines: u32,
34 after_lines: u32,
35 strategy: String,
36 kept_line_count: u32,
37 removed_line_count: u32,
38 },
39 AgentAction {
40 agent_id: String,
41 action: String,
42 tool: Option<String>,
43 },
44 KnowledgeUpdate {
45 category: String,
46 key: String,
47 action: String,
48 },
49 ThresholdShift {
50 language: String,
51 old_entropy: f64,
52 new_entropy: f64,
53 old_jaccard: f64,
54 new_jaccard: f64,
55 },
56 BudgetWarning {
57 role: String,
58 dimension: String,
59 used: String,
60 limit: String,
61 percent: u8,
62 },
63 BudgetExhausted {
64 role: String,
65 dimension: String,
66 used: String,
67 limit: String,
68 },
69 PolicyViolation {
70 role: String,
71 tool: String,
72 reason: String,
73 },
74 RoleChanged {
75 from: String,
76 to: String,
77 },
78}
79
80struct EventBus {
81 seq: AtomicU64,
82 ring: Mutex<VecDeque<LeanCtxEvent>>,
83}
84
85impl EventBus {
86 fn new() -> Self {
87 Self {
88 seq: AtomicU64::new(0),
89 ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
90 }
91 }
92
93 fn emit(&self, kind: EventKind) -> u64 {
94 let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
95 let event = LeanCtxEvent {
96 id,
97 timestamp: chrono::Local::now()
98 .format("%Y-%m-%dT%H:%M:%S%.3f")
99 .to_string(),
100 kind,
101 };
102
103 {
104 let mut ring = self
105 .ring
106 .lock()
107 .unwrap_or_else(std::sync::PoisonError::into_inner);
108 if ring.len() >= RING_CAPACITY {
109 ring.pop_front();
110 }
111 ring.push_back(event.clone());
112 }
113
114 append_jsonl(&event);
115 id
116 }
117
118 fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
119 let ring = self
120 .ring
121 .lock()
122 .unwrap_or_else(std::sync::PoisonError::into_inner);
123 ring.iter().filter(|e| e.id > after_id).cloned().collect()
124 }
125
126 fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
127 let ring = self
128 .ring
129 .lock()
130 .unwrap_or_else(std::sync::PoisonError::into_inner);
131 let len = ring.len();
132 let start = len.saturating_sub(n);
133 ring.iter().skip(start).cloned().collect()
134 }
135}
136
137fn bus() -> &'static EventBus {
138 static INSTANCE: OnceLock<EventBus> = OnceLock::new();
139 INSTANCE.get_or_init(EventBus::new)
140}
141
142fn jsonl_path() -> Option<std::path::PathBuf> {
143 crate::core::data_dir::lean_ctx_data_dir()
144 .ok()
145 .map(|d| d.join("events.jsonl"))
146}
147
148fn append_jsonl(event: &LeanCtxEvent) {
149 let Some(path) = jsonl_path() else { return };
150
151 if let Some(parent) = path.parent() {
152 let _ = std::fs::create_dir_all(parent);
153 }
154
155 if let Ok(content) = std::fs::read_to_string(&path) {
156 let lines = content.lines().count();
157 if lines >= JSONL_MAX_LINES {
158 let old = path.with_extension("jsonl.old");
159 let _ = std::fs::remove_file(&old);
160 let _ = std::fs::rename(&path, &old);
161 }
162 }
163
164 if let Ok(json) = serde_json::to_string(event) {
165 use std::io::Write;
166 if let Ok(mut f) = std::fs::OpenOptions::new()
167 .create(true)
168 .append(true)
169 .open(&path)
170 {
171 let _ = writeln!(f, "{json}");
172 }
173 }
174}
175
176pub fn emit(kind: EventKind) -> u64 {
179 bus().emit(kind)
180}
181
182pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
183 bus().events_since(after_id)
184}
185
186pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
187 bus().latest_events(n)
188}
189
190pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
191 let Some(path) = jsonl_path() else {
192 return Vec::new();
193 };
194 let Ok(content) = std::fs::read_to_string(&path) else {
195 return Vec::new();
196 };
197 let all: Vec<LeanCtxEvent> = content
198 .lines()
199 .filter(|l| !l.trim().is_empty())
200 .filter_map(|l| serde_json::from_str(l).ok())
201 .collect();
202 let start = all.len().saturating_sub(n);
203 all[start..].to_vec()
204}
205
206pub fn emit_tool_call(
207 tool: &str,
208 tokens_original: u64,
209 tokens_saved: u64,
210 mode: Option<String>,
211 duration_ms: u64,
212 path: Option<String>,
213) {
214 emit(EventKind::ToolCall {
215 tool: tool.to_string(),
216 tokens_original,
217 tokens_saved,
218 mode,
219 duration_ms,
220 path,
221 });
222}
223
224pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
225 emit(EventKind::CacheHit {
226 path: path.to_string(),
227 saved_tokens,
228 });
229}
230
231pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
232 emit(EventKind::AgentAction {
233 agent_id: agent_id.to_string(),
234 action: action.to_string(),
235 tool: tool.map(std::string::ToString::to_string),
236 });
237}
238
239pub fn emit_budget_warning(role: &str, dimension: &str, used: &str, limit: &str, percent: u8) {
240 emit(EventKind::BudgetWarning {
241 role: role.to_string(),
242 dimension: dimension.to_string(),
243 used: used.to_string(),
244 limit: limit.to_string(),
245 percent,
246 });
247}
248
249pub fn emit_budget_exhausted(role: &str, dimension: &str, used: &str, limit: &str) {
250 emit(EventKind::BudgetExhausted {
251 role: role.to_string(),
252 dimension: dimension.to_string(),
253 used: used.to_string(),
254 limit: limit.to_string(),
255 });
256}
257
258pub fn emit_policy_violation(role: &str, tool: &str, reason: &str) {
259 emit(EventKind::PolicyViolation {
260 role: role.to_string(),
261 tool: tool.to_string(),
262 reason: reason.to_string(),
263 });
264}
265
266pub fn emit_role_changed(from: &str, to: &str) {
267 emit(EventKind::RoleChanged {
268 from: from.to_string(),
269 to: to.to_string(),
270 });
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn emit_returns_positive_id() {
279 let id = emit(EventKind::ToolCall {
280 tool: "ctx_read".to_string(),
281 tokens_original: 1000,
282 tokens_saved: 800,
283 mode: Some("map".to_string()),
284 duration_ms: 5,
285 path: Some("src/main.rs".to_string()),
286 });
287 assert!(id > 0);
288 let events = latest_events(100);
289 assert!(events.iter().any(|e| e.id == id));
290 }
291
292 #[test]
293 fn events_since_filters_correctly() {
294 let id1 = emit(EventKind::CacheHit {
295 path: "filter_test_a.rs".to_string(),
296 saved_tokens: 100,
297 });
298 let id2 = emit(EventKind::CacheHit {
299 path: "filter_test_b.rs".to_string(),
300 saved_tokens: 200,
301 });
302
303 let after = events_since(id1);
304 assert!(after.iter().any(|e| e.id == id2));
305 assert!(after.iter().all(|e| e.id > id1));
306 }
307}