1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11 pub id: u64,
12 pub timestamp: String,
13 pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19 ToolCall {
20 tool: String,
21 tokens_original: u64,
22 tokens_saved: u64,
23 mode: Option<String>,
24 duration_ms: u64,
25 path: Option<String>,
26 },
27 CacheHit {
28 path: String,
29 saved_tokens: u64,
30 },
31 Compression {
32 path: String,
33 before_lines: u32,
34 after_lines: u32,
35 strategy: String,
36 kept_line_count: u32,
37 removed_line_count: u32,
38 },
39 AgentAction {
40 agent_id: String,
41 action: String,
42 tool: Option<String>,
43 },
44 KnowledgeUpdate {
45 category: String,
46 key: String,
47 action: String,
48 },
49 ThresholdShift {
50 language: String,
51 old_entropy: f64,
52 new_entropy: f64,
53 old_jaccard: f64,
54 new_jaccard: f64,
55 },
56}
57
58struct EventBus {
59 seq: AtomicU64,
60 ring: Mutex<VecDeque<LeanCtxEvent>>,
61}
62
63impl EventBus {
64 fn new() -> Self {
65 Self {
66 seq: AtomicU64::new(0),
67 ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
68 }
69 }
70
71 fn emit(&self, kind: EventKind) -> u64 {
72 let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
73 let event = LeanCtxEvent {
74 id,
75 timestamp: chrono::Local::now()
76 .format("%Y-%m-%dT%H:%M:%S%.3f")
77 .to_string(),
78 kind,
79 };
80
81 {
82 let mut ring = self
83 .ring
84 .lock()
85 .unwrap_or_else(std::sync::PoisonError::into_inner);
86 if ring.len() >= RING_CAPACITY {
87 ring.pop_front();
88 }
89 ring.push_back(event.clone());
90 }
91
92 append_jsonl(&event);
93 id
94 }
95
96 fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
97 let ring = self
98 .ring
99 .lock()
100 .unwrap_or_else(std::sync::PoisonError::into_inner);
101 ring.iter().filter(|e| e.id > after_id).cloned().collect()
102 }
103
104 fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
105 let ring = self
106 .ring
107 .lock()
108 .unwrap_or_else(std::sync::PoisonError::into_inner);
109 let len = ring.len();
110 let start = len.saturating_sub(n);
111 ring.iter().skip(start).cloned().collect()
112 }
113}
114
115fn bus() -> &'static EventBus {
116 static INSTANCE: OnceLock<EventBus> = OnceLock::new();
117 INSTANCE.get_or_init(EventBus::new)
118}
119
120fn jsonl_path() -> Option<std::path::PathBuf> {
121 crate::core::data_dir::lean_ctx_data_dir()
122 .ok()
123 .map(|d| d.join("events.jsonl"))
124}
125
126fn append_jsonl(event: &LeanCtxEvent) {
127 let Some(path) = jsonl_path() else { return };
128
129 if let Some(parent) = path.parent() {
130 let _ = std::fs::create_dir_all(parent);
131 }
132
133 if let Ok(content) = std::fs::read_to_string(&path) {
134 let lines = content.lines().count();
135 if lines >= JSONL_MAX_LINES {
136 let old = path.with_extension("jsonl.old");
137 let _ = std::fs::remove_file(&old);
138 let _ = std::fs::rename(&path, &old);
139 }
140 }
141
142 if let Ok(json) = serde_json::to_string(event) {
143 use std::io::Write;
144 if let Ok(mut f) = std::fs::OpenOptions::new()
145 .create(true)
146 .append(true)
147 .open(&path)
148 {
149 let _ = writeln!(f, "{json}");
150 }
151 }
152}
153
154pub fn emit(kind: EventKind) -> u64 {
157 bus().emit(kind)
158}
159
160pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
161 bus().events_since(after_id)
162}
163
164pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
165 bus().latest_events(n)
166}
167
168pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
169 let Some(path) = jsonl_path() else {
170 return Vec::new();
171 };
172 let Ok(content) = std::fs::read_to_string(&path) else {
173 return Vec::new();
174 };
175 let all: Vec<LeanCtxEvent> = content
176 .lines()
177 .filter(|l| !l.trim().is_empty())
178 .filter_map(|l| serde_json::from_str(l).ok())
179 .collect();
180 let start = all.len().saturating_sub(n);
181 all[start..].to_vec()
182}
183
184pub fn emit_tool_call(
185 tool: &str,
186 tokens_original: u64,
187 tokens_saved: u64,
188 mode: Option<String>,
189 duration_ms: u64,
190 path: Option<String>,
191) {
192 emit(EventKind::ToolCall {
193 tool: tool.to_string(),
194 tokens_original,
195 tokens_saved,
196 mode,
197 duration_ms,
198 path,
199 });
200}
201
202pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
203 emit(EventKind::CacheHit {
204 path: path.to_string(),
205 saved_tokens,
206 });
207}
208
209pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
210 emit(EventKind::AgentAction {
211 agent_id: agent_id.to_string(),
212 action: action.to_string(),
213 tool: tool.map(std::string::ToString::to_string),
214 });
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn emit_returns_positive_id() {
223 let id = emit(EventKind::ToolCall {
224 tool: "ctx_read".to_string(),
225 tokens_original: 1000,
226 tokens_saved: 800,
227 mode: Some("map".to_string()),
228 duration_ms: 5,
229 path: Some("src/main.rs".to_string()),
230 });
231 assert!(id > 0);
232 let events = latest_events(100);
233 assert!(events.iter().any(|e| e.id == id));
234 }
235
236 #[test]
237 fn events_since_filters_correctly() {
238 let id1 = emit(EventKind::CacheHit {
239 path: "filter_test_a.rs".to_string(),
240 saved_tokens: 100,
241 });
242 let id2 = emit(EventKind::CacheHit {
243 path: "filter_test_b.rs".to_string(),
244 saved_tokens: 200,
245 });
246
247 let after = events_since(id1);
248 assert!(after.iter().any(|e| e.id == id2));
249 assert!(after.iter().all(|e| e.id > id1));
250 }
251}