Skip to main content

lean_ctx/core/
events.rs

1use serde::{Deserialize, Serialize};
2use std::collections::VecDeque;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6const RING_CAPACITY: usize = 1000;
7const JSONL_MAX_LINES: usize = 10_000;
8
9#[derive(Clone, Debug, Serialize, Deserialize)]
10pub struct LeanCtxEvent {
11    pub id: u64,
12    pub timestamp: String,
13    pub kind: EventKind,
14}
15
16#[derive(Clone, Debug, Serialize, Deserialize)]
17#[serde(tag = "type")]
18pub enum EventKind {
19    ToolCall {
20        tool: String,
21        tokens_original: u64,
22        tokens_saved: u64,
23        mode: Option<String>,
24        duration_ms: u64,
25        path: Option<String>,
26    },
27    CacheHit {
28        path: String,
29        saved_tokens: u64,
30    },
31    Compression {
32        path: String,
33        before_lines: u32,
34        after_lines: u32,
35        strategy: String,
36        kept_line_count: u32,
37        removed_line_count: u32,
38    },
39    AgentAction {
40        agent_id: String,
41        action: String,
42        tool: Option<String>,
43    },
44    KnowledgeUpdate {
45        category: String,
46        key: String,
47        action: String,
48    },
49    ThresholdShift {
50        language: String,
51        old_entropy: f64,
52        new_entropy: f64,
53        old_jaccard: f64,
54        new_jaccard: f64,
55    },
56}
57
58struct EventBus {
59    seq: AtomicU64,
60    ring: Mutex<VecDeque<LeanCtxEvent>>,
61}
62
63impl EventBus {
64    fn new() -> Self {
65        Self {
66            seq: AtomicU64::new(0),
67            ring: Mutex::new(VecDeque::with_capacity(RING_CAPACITY)),
68        }
69    }
70
71    fn emit(&self, kind: EventKind) -> u64 {
72        let id = self.seq.fetch_add(1, Ordering::Relaxed) + 1;
73        let event = LeanCtxEvent {
74            id,
75            timestamp: chrono::Local::now()
76                .format("%Y-%m-%dT%H:%M:%S%.3f")
77                .to_string(),
78            kind,
79        };
80
81        {
82            let mut ring = self.ring.lock().unwrap_or_else(|e| e.into_inner());
83            if ring.len() >= RING_CAPACITY {
84                ring.pop_front();
85            }
86            ring.push_back(event.clone());
87        }
88
89        append_jsonl(&event);
90        id
91    }
92
93    fn events_since(&self, after_id: u64) -> Vec<LeanCtxEvent> {
94        let ring = self.ring.lock().unwrap_or_else(|e| e.into_inner());
95        ring.iter().filter(|e| e.id > after_id).cloned().collect()
96    }
97
98    fn latest_events(&self, n: usize) -> Vec<LeanCtxEvent> {
99        let ring = self.ring.lock().unwrap_or_else(|e| e.into_inner());
100        let len = ring.len();
101        let start = len.saturating_sub(n);
102        ring.iter().skip(start).cloned().collect()
103    }
104}
105
106fn bus() -> &'static EventBus {
107    static INSTANCE: OnceLock<EventBus> = OnceLock::new();
108    INSTANCE.get_or_init(EventBus::new)
109}
110
111fn jsonl_path() -> Option<std::path::PathBuf> {
112    crate::core::data_dir::lean_ctx_data_dir()
113        .ok()
114        .map(|d| d.join("events.jsonl"))
115}
116
117fn append_jsonl(event: &LeanCtxEvent) {
118    let Some(path) = jsonl_path() else { return };
119
120    if let Some(parent) = path.parent() {
121        let _ = std::fs::create_dir_all(parent);
122    }
123
124    if let Ok(content) = std::fs::read_to_string(&path) {
125        let lines = content.lines().count();
126        if lines >= JSONL_MAX_LINES {
127            let old = path.with_extension("jsonl.old");
128            let _ = std::fs::remove_file(&old);
129            let _ = std::fs::rename(&path, &old);
130        }
131    }
132
133    if let Ok(json) = serde_json::to_string(event) {
134        use std::io::Write;
135        if let Ok(mut f) = std::fs::OpenOptions::new()
136            .create(true)
137            .append(true)
138            .open(&path)
139        {
140            let _ = writeln!(f, "{json}");
141        }
142    }
143}
144
145// --- Public API ---
146
147pub fn emit(kind: EventKind) -> u64 {
148    bus().emit(kind)
149}
150
151pub fn events_since(after_id: u64) -> Vec<LeanCtxEvent> {
152    bus().events_since(after_id)
153}
154
155pub fn latest_events(n: usize) -> Vec<LeanCtxEvent> {
156    bus().latest_events(n)
157}
158
159pub fn load_events_from_file(n: usize) -> Vec<LeanCtxEvent> {
160    let Some(path) = jsonl_path() else {
161        return Vec::new();
162    };
163    let Ok(content) = std::fs::read_to_string(&path) else {
164        return Vec::new();
165    };
166    let all: Vec<LeanCtxEvent> = content
167        .lines()
168        .filter(|l| !l.trim().is_empty())
169        .filter_map(|l| serde_json::from_str(l).ok())
170        .collect();
171    let start = all.len().saturating_sub(n);
172    all[start..].to_vec()
173}
174
175pub fn emit_tool_call(
176    tool: &str,
177    tokens_original: u64,
178    tokens_saved: u64,
179    mode: Option<String>,
180    duration_ms: u64,
181    path: Option<String>,
182) {
183    emit(EventKind::ToolCall {
184        tool: tool.to_string(),
185        tokens_original,
186        tokens_saved,
187        mode,
188        duration_ms,
189        path,
190    });
191}
192
193pub fn emit_cache_hit(path: &str, saved_tokens: u64) {
194    emit(EventKind::CacheHit {
195        path: path.to_string(),
196        saved_tokens,
197    });
198}
199
200pub fn emit_agent_action(agent_id: &str, action: &str, tool: Option<&str>) {
201    emit(EventKind::AgentAction {
202        agent_id: agent_id.to_string(),
203        action: action.to_string(),
204        tool: tool.map(|t| t.to_string()),
205    });
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn emit_returns_positive_id() {
214        let id = emit(EventKind::ToolCall {
215            tool: "ctx_read".to_string(),
216            tokens_original: 1000,
217            tokens_saved: 800,
218            mode: Some("map".to_string()),
219            duration_ms: 5,
220            path: Some("src/main.rs".to_string()),
221        });
222        assert!(id > 0);
223        let events = latest_events(100);
224        assert!(events.iter().any(|e| e.id == id));
225    }
226
227    #[test]
228    fn events_since_filters_correctly() {
229        let id1 = emit(EventKind::CacheHit {
230            path: "filter_test_a.rs".to_string(),
231            saved_tokens: 100,
232        });
233        let id2 = emit(EventKind::CacheHit {
234            path: "filter_test_b.rs".to_string(),
235            saved_tokens: 200,
236        });
237
238        let after = events_since(id1);
239        assert!(after.iter().any(|e| e.id == id2));
240        assert!(after.iter().all(|e| e.id > id1));
241    }
242}