Skip to main content

lean_ctx/core/
events.rs

1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11    pub id: u64,
12    pub timestamp: String,
13    pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19    ToolCall {
20        tool: String,
21        tokens_original: u64,
22        tokens_saved: u64,
23        mode: Option<String>,
24        duration_ms: u64,
25        path: Option<String>,
26    },
27    CacheHit {
28        path: String,
29        saved_tokens: u64,
30    },
31    Compression {
32        path: String,
33        before_lines: u32,
34        after_lines: u32,
35        strategy: String,
36        kept_line_count: u32,
37        removed_line_count: u32,
38    },
39    AgentAction {
40        agent_id: String,
41        action: String,
42        tool: Option<String>,
43    },
44    KnowledgeUpdate {
45        category: String,
46        key: String,
47        action: String,
48    },
49    ThresholdShift {
50        language: String,
51        old_entropy: f64,
52        new_entropy: f64,
53        old_jaccard: f64,
54        new_jaccard: f64,
55    },
56    BudgetWarning {
57        role: String,
58        dimension: String,
59        used: String,
60        limit: String,
61        percent: u8,
62    },
63    BudgetExhausted {
64        role: String,
65        dimension: String,
66        used: String,
67        limit: String,
68    },
69    PolicyViolation {
70        role: String,
71        tool: String,
72        reason: String,
73    },
74    RoleChanged {
75        from: String,
76        to: String,
77    },
78}
79
80struct EventBus {
81    seq: AtomicU64,
82    ring: Mutex<VecDeque<LeanCtxEvent>>,
83}
84
85impl EventBus {
86    fn new() -> Self {
87        Self {
88            seq: AtomicU64::new(0),
89            ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
90        }
91    }
92
93    fn emit(&self, kind: EventKind) -> u64 {
94        let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
95        let event = LeanCtxEvent {
96            id,
97            timestamp: chrono::Local::now()
98                .format("%Y-%m-%dT%H:%M:%S%.3f")
99                .to_string(),
100            kind,
101        };
102
103        {
104            let mut ring = self
105                .ring
106                .lock()
107                .unwrap_or_else(std::sync::PoisonError::into_inner);
108            if ring.len() >= RING_CAPACITY {
109                ring.pop_front();
110            }
111            ring.push_back(event.clone());
112        }
113
114        append_jsonl(&event);
115        id
116    }
117
118    fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
119        let ring = self
120            .ring
121            .lock()
122            .unwrap_or_else(std::sync::PoisonError::into_inner);
123        ring.iter().filter(|e| e.id > after_id).cloned().collect()
124    }
125
126    fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
127        let ring = self
128            .ring
129            .lock()
130            .unwrap_or_else(std::sync::PoisonError::into_inner);
131        let len = ring.len();
132        let start = len.saturating_sub(n);
133        ring.iter().skip(start).cloned().collect()
134    }
135}
136
137fn bus() -> &'static EventBus {
138    static INSTANCE: OnceLock<EventBus> = OnceLock::new();
139    INSTANCE.get_or_init(EventBus::new)
140}
141
142fn jsonl_path() -> Option<std::path::PathBuf> {
143    crate::core::data_dir::lean_ctx_data_dir()
144        .ok()
145        .map(|d| d.join("events.jsonl"))
146}
147
148fn append_jsonl(event: &LeanCtxEvent) {
149    let Some(path) = jsonl_path() else { return };
150
151    if let Some(parent) = path.parent() {
152        let _ = std::fs::create_dir_all(parent);
153    }
154
155    if let Ok(content) = std::fs::read_to_string(&path) {
156        let lines = content.lines().count();
157        if lines >= JSONL_MAX_LINES {
158            let old = path.with_extension("jsonl.old");
159            let _ = std::fs::remove_file(&old);
160            let _ = std::fs::rename(&path, &old);
161        }
162    }
163
164    if let Ok(json) = serde_json::to_string(event) {
165        use std::io::Write;
166        if let Ok(mut f) = std::fs::OpenOptions::new()
167            .create(true)
168            .append(true)
169            .open(&path)
170        {
171            let _ = writeln!(f, "{json}");
172        }
173    }
174}
175
176// --- Public API ---
177
178pub fn emit(kind: EventKind) -> u64 {
179    bus().emit(kind)
180}
181
182pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
183    bus().events_since(after_id)
184}
185
186pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
187    bus().latest_events(n)
188}
189
190pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
191    let Some(path) = jsonl_path() else {
192        return Vec::new();
193    };
194    let Ok(content) = std::fs::read_to_string(&path) else {
195        return Vec::new();
196    };
197    let all: Vec<LeanCtxEvent> = content
198        .lines()
199        .filter(|l| !l.trim().is_empty())
200        .filter_map(|l| serde_json::from_str(l).ok())
201        .collect();
202    let start = all.len().saturating_sub(n);
203    all[start..].to_vec()
204}
205
206pub fn emit_tool_call(
207    tool: &str,
208    tokens_original: u64,
209    tokens_saved: u64,
210    mode: Option<String>,
211    duration_ms: u64,
212    path: Option<String>,
213) {
214    emit(EventKind::ToolCall {
215        tool: tool.to_string(),
216        tokens_original,
217        tokens_saved,
218        mode,
219        duration_ms,
220        path,
221    });
222}
223
224pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
225    emit(EventKind::CacheHit {
226        path: path.to_string(),
227        saved_tokens,
228    });
229}
230
231pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
232    emit(EventKind::AgentAction {
233        agent_id: agent_id.to_string(),
234        action: action.to_string(),
235        tool: tool.map(std::string::ToString::to_string),
236    });
237}
238
239pub fn emit_budget_warning(role: &str, dimension: &str, used: &str, limit: &str, percent: u8) {
240    emit(EventKind::BudgetWarning {
241        role: role.to_string(),
242        dimension: dimension.to_string(),
243        used: used.to_string(),
244        limit: limit.to_string(),
245        percent,
246    });
247}
248
249pub fn emit_budget_exhausted(role: &str, dimension: &str, used: &str, limit: &str) {
250    emit(EventKind::BudgetExhausted {
251        role: role.to_string(),
252        dimension: dimension.to_string(),
253        used: used.to_string(),
254        limit: limit.to_string(),
255    });
256}
257
258pub fn emit_policy_violation(role: &str, tool: &str, reason: &str) {
259    emit(EventKind::PolicyViolation {
260        role: role.to_string(),
261        tool: tool.to_string(),
262        reason: reason.to_string(),
263    });
264}
265
266pub fn emit_role_changed(from: &str, to: &str) {
267    emit(EventKind::RoleChanged {
268        from: from.to_string(),
269        to: to.to_string(),
270    });
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn emit_returns_positive_id() {
279        let id = emit(EventKind::ToolCall {
280            tool: "ctx_read".to_string(),
281            tokens_original: 1000,
282            tokens_saved: 800,
283            mode: Some("map".to_string()),
284            duration_ms: 5,
285            path: Some("src/main.rs".to_string()),
286        });
287        assert!(id > 0);
288        let events = latest_events(100);
289        assert!(events.iter().any(|e| e.id == id));
290    }
291
292    #[test]
293    fn events_since_filters_correctly() {
294        let id1 = emit(EventKind::CacheHit {
295            path: "filter_test_a.rs".to_string(),
296            saved_tokens: 100,
297        });
298        let id2 = emit(EventKind::CacheHit {
299            path: "filter_test_b.rs".to_string(),
300            saved_tokens: 200,
301        });
302
303        let after = events_since(id1);
304        assert!(after.iter().any(|e| e.id == id2));
305        assert!(after.iter().all(|e| e.id > id1));
306    }
307}