Skip to main content

lean_ctx/core/
events.rs

1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11    pub id: u64,
12    pub timestamp: String,
13    pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19    ToolCall {
20        tool: String,
21        tokens_original: u64,
22        tokens_saved: u64,
23        mode: Option<String>,
24        duration_ms: u64,
25        path: Option<String>,
26    },
27    CacheHit {
28        path: String,
29        saved_tokens: u64,
30    },
31    Compression {
32        path: String,
33        before_lines: u32,
34        after_lines: u32,
35        strategy: String,
36        kept_line_count: u32,
37        removed_line_count: u32,
38    },
39    AgentAction {
40        agent_id: String,
41        action: String,
42        tool: Option<String>,
43    },
44    KnowledgeUpdate {
45        category: String,
46        key: String,
47        action: String,
48    },
49    ThresholdShift {
50        language: String,
51        old_entropy: f64,
52        new_entropy: f64,
53        old_jaccard: f64,
54        new_jaccard: f64,
55    },
56    BudgetWarning {
57        role: String,
58        dimension: String,
59        used: String,
60        limit: String,
61        percent: u8,
62    },
63    BudgetExhausted {
64        role: String,
65        dimension: String,
66        used: String,
67        limit: String,
68    },
69    PolicyViolation {
70        role: String,
71        tool: String,
72        reason: String,
73    },
74    RoleChanged {
75        from: String,
76        to: String,
77    },
78    ProfileChanged {
79        from: String,
80        to: String,
81    },
82    SloViolation {
83        slo_name: String,
84        metric: String,
85        threshold: f64,
86        actual: f64,
87        action: String,
88    },
89    Anomaly {
90        metric: String,
91        expected: f64,
92        actual: f64,
93        deviation_factor: f64,
94    },
95    VerificationWarning {
96        warning_kind: String,
97        detail: String,
98        severity: String,
99    },
100    ThresholdAdapted {
101        language: String,
102        arm: String,
103        old_threshold: f64,
104        new_threshold: f64,
105    },
106}
107
108struct EventBus {
109    seq: AtomicU64,
110    ring: Mutex<VecDeque<LeanCtxEvent>>,
111}
112
113impl EventBus {
114    fn new() -> Self {
115        Self {
116            seq: AtomicU64::new(0),
117            ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
118        }
119    }
120
121    fn emit(&self, kind: EventKind) -> u64 {
122        let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
123        let event = LeanCtxEvent {
124            id,
125            timestamp: chrono::Local::now()
126                .format("%Y-%m-%dT%H:%M:%S%.3f")
127                .to_string(),
128            kind,
129        };
130
131        {
132            let mut ring = self
133                .ring
134                .lock()
135                .unwrap_or_else(std::sync::PoisonError::into_inner);
136            if ring.len() >= RING_CAPACITY {
137                ring.pop_front();
138            }
139            ring.push_back(event.clone());
140        }
141
142        append_jsonl(&event);
143        id
144    }
145
146    fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
147        let ring = self
148            .ring
149            .lock()
150            .unwrap_or_else(std::sync::PoisonError::into_inner);
151        ring.iter().filter(|e| e.id > after_id).cloned().collect()
152    }
153
154    fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
155        let ring = self
156            .ring
157            .lock()
158            .unwrap_or_else(std::sync::PoisonError::into_inner);
159        let len = ring.len();
160        let start = len.saturating_sub(n);
161        ring.iter().skip(start).cloned().collect()
162    }
163}
164
165fn bus() -> &'static EventBus {
166    static INSTANCE: OnceLock<EventBus> = OnceLock::new();
167    INSTANCE.get_or_init(EventBus::new)
168}
169
170fn jsonl_path() -> Option<std::path::PathBuf> {
171    crate::core::data_dir::lean_ctx_data_dir()
172        .ok()
173        .map(|d| d.join("events.jsonl"))
174}
175
176fn is_test_environment() -> bool {
177    use std::sync::OnceLock;
178    static CACHED: OnceLock<bool> = OnceLock::new();
179    *CACHED.get_or_init(|| {
180        if cfg!(test) {
181            return true;
182        }
183        if std::env::var_os("__LEAN_CTX_SKIP_EVENTS").is_some() {
184            return true;
185        }
186        std::env::current_exe().is_ok_and(|p| {
187            let s = p.to_string_lossy();
188            s.contains("/deps/") || s.contains("\\deps\\")
189        })
190    })
191}
192
193fn append_jsonl(event: &LeanCtxEvent) {
194    if is_test_environment() {
195        return;
196    }
197    let Some(path) = jsonl_path() else { return };
198
199    if let Some(parent) = path.parent() {
200        let _ = std::fs::create_dir_all(parent);
201    }
202
203    if let Ok(content) = std::fs::read_to_string(&path) {
204        let lines = content.lines().count();
205        if lines >= JSONL_MAX_LINES {
206            let old = path.with_extension("jsonl.old");
207            let _ = std::fs::remove_file(&old);
208            let _ = std::fs::rename(&path, &old);
209        }
210    }
211
212    if let Ok(json) = serde_json::to_string(event) {
213        use std::io::Write;
214        if let Ok(mut f) = std::fs::OpenOptions::new()
215            .create(true)
216            .append(true)
217            .open(&path)
218        {
219            let _ = writeln!(f, "{json}");
220        }
221    }
222}
223
224// --- Public API ---
225
226pub fn emit(kind: EventKind) -> u64 {
227    bus().emit(kind)
228}
229
230pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
231    bus().events_since(after_id)
232}
233
234pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
235    bus().latest_events(n)
236}
237
238pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
239    let Some(path) = jsonl_path() else {
240        return Vec::new();
241    };
242    let Ok(content) = std::fs::read_to_string(&path) else {
243        return Vec::new();
244    };
245    let all: Vec<LeanCtxEvent> = content
246        .lines()
247        .filter(|l| !l.trim().is_empty())
248        .filter_map(|l| serde_json::from_str(l).ok())
249        .collect();
250    let start = all.len().saturating_sub(n);
251    all[start..].to_vec()
252}
253
254pub fn emit_tool_call(
255    tool: &str,
256    tokens_original: u64,
257    tokens_saved: u64,
258    mode: Option<String>,
259    duration_ms: u64,
260    path: Option<String>,
261) {
262    emit(EventKind::ToolCall {
263        tool: tool.to_string(),
264        tokens_original,
265        tokens_saved,
266        mode,
267        duration_ms,
268        path,
269    });
270}
271
272pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
273    emit(EventKind::CacheHit {
274        path: path.to_string(),
275        saved_tokens,
276    });
277}
278
279pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
280    emit(EventKind::AgentAction {
281        agent_id: agent_id.to_string(),
282        action: action.to_string(),
283        tool: tool.map(std::string::ToString::to_string),
284    });
285}
286
287pub fn emit_budget_warning(role: &str, dimension: &str, used: &str, limit: &str, percent: u8) {
288    emit(EventKind::BudgetWarning {
289        role: role.to_string(),
290        dimension: dimension.to_string(),
291        used: used.to_string(),
292        limit: limit.to_string(),
293        percent,
294    });
295}
296
297pub fn emit_budget_exhausted(role: &str, dimension: &str, used: &str, limit: &str) {
298    emit(EventKind::BudgetExhausted {
299        role: role.to_string(),
300        dimension: dimension.to_string(),
301        used: used.to_string(),
302        limit: limit.to_string(),
303    });
304}
305
306pub fn emit_policy_violation(role: &str, tool: &str, reason: &str) {
307    emit(EventKind::PolicyViolation {
308        role: role.to_string(),
309        tool: tool.to_string(),
310        reason: reason.to_string(),
311    });
312}
313
314pub fn emit_role_changed(from: &str, to: &str) {
315    emit(EventKind::RoleChanged {
316        from: from.to_string(),
317        to: to.to_string(),
318    });
319}
320
321pub fn emit_profile_changed(from: &str, to: &str) {
322    emit(EventKind::ProfileChanged {
323        from: from.to_string(),
324        to: to.to_string(),
325    });
326}
327
328pub fn emit_slo_violation(slo_name: &str, metric: &str, threshold: f64, actual: f64, action: &str) {
329    emit(EventKind::SloViolation {
330        slo_name: slo_name.to_string(),
331        metric: metric.to_string(),
332        threshold,
333        actual,
334        action: action.to_string(),
335    });
336}
337
338pub fn emit_anomaly(metric: &str, expected: f64, actual: f64, deviation_factor: f64) {
339    emit(EventKind::Anomaly {
340        metric: metric.to_string(),
341        expected,
342        actual,
343        deviation_factor,
344    });
345}
346
347pub fn emit_verification_warning(warning_kind: &str, detail: &str, severity: &str) {
348    emit(EventKind::VerificationWarning {
349        warning_kind: warning_kind.to_string(),
350        detail: detail.to_string(),
351        severity: severity.to_string(),
352    });
353}
354
355pub fn emit_threshold_adapted(language: &str, arm: &str, old_threshold: f64, new_threshold: f64) {
356    emit(EventKind::ThresholdAdapted {
357        language: language.to_string(),
358        arm: arm.to_string(),
359        old_threshold,
360        new_threshold,
361    });
362}
363
364#[cfg(test)]
365mod tests {
366    use super::*;
367
368    #[test]
369    fn emit_returns_positive_id() {
370        let id = emit(EventKind::ToolCall {
371            tool: "ctx_read".to_string(),
372            tokens_original: 1000,
373            tokens_saved: 800,
374            mode: Some("map".to_string()),
375            duration_ms: 5,
376            path: Some("src/main.rs".to_string()),
377        });
378        assert!(id > 0);
379        let events = latest_events(100);
380        assert!(events.iter().any(|e| e.id == id));
381    }
382
383    #[test]
384    fn events_since_filters_correctly() {
385        let id1 = emit(EventKind::CacheHit {
386            path: "filter_test_a.rs".to_string(),
387            saved_tokens: 100,
388        });
389        let id2 = emit(EventKind::CacheHit {
390            path: "filter_test_b.rs".to_string(),
391            saved_tokens: 200,
392        });
393
394        let after = events_since(id1);
395        assert!(after.iter().any(|e| e.id == id2));
396        assert!(after.iter().all(|e| e.id > id1));
397    }
398}