1use std::collections::HashMap;
8use std::path::Path;
9
10use rusqlite::{params, Connection};
11
12use crate::types::GraphError;
13
14#[derive(Debug, Clone, serde::Serialize)]
16pub struct TelemetryEvent {
17 pub id: Option<i64>,
18 pub timestamp: String,
19 pub command: String,
20 pub duration_ms: u64,
21 pub exit_code: i32,
22 pub error_count: u32,
23 pub warning_count: u32,
24 pub node_count: u32,
26 pub edge_count: u32,
27 pub language_mix: HashMap<String, u32>,
28 pub resolution_tiers: HashMap<String, u32>,
29 pub circuit_breaker_events: u32,
30 pub error_codes: HashMap<String, u32>,
31 pub client_name: Option<String>,
32 pub violations_resolved: u32,
34 pub violations_persisted: u32,
36 pub violations_new: u32,
38}
39
40#[derive(Debug, Clone, Default, serde::Serialize)]
42pub struct AgentStats {
43 pub sessions: u64,
44 pub total_tool_calls: u64,
45 pub avg_tool_calls_per_session: f64,
46 pub tool_usage: HashMap<String, u64>,
47}
48
49#[derive(Debug, Clone, serde::Serialize)]
51pub struct TelemetryAggregate {
52 pub total_invocations: u64,
53 pub avg_compile_ms: Option<f64>,
54 pub avg_map_ms: Option<f64>,
55 pub total_errors: u64,
56 pub total_warnings: u64,
57 pub command_counts: HashMap<String, u64>,
58 pub language_percentages: HashMap<String, f64>,
59 pub top_error_codes: HashMap<String, u64>,
60 pub agent_stats: HashMap<String, AgentStats>,
61}
62
63pub struct TelemetryStore {
65 conn: Connection,
66}
67
68impl TelemetryStore {
69 pub fn open(path: &Path) -> Result<Self, GraphError> {
71 let conn = Connection::open(path)?;
72 conn.execute_batch(
73 "PRAGMA journal_mode = WAL;
74 PRAGMA synchronous = NORMAL;",
75 )?;
76 let store = Self { conn };
77 store.initialize_schema()?;
78 Ok(store)
79 }
80
81 pub fn in_memory() -> Result<Self, GraphError> {
83 let conn = Connection::open_in_memory()?;
84 let store = Self { conn };
85 store.initialize_schema()?;
86 Ok(store)
87 }
88
89 fn initialize_schema(&self) -> Result<(), GraphError> {
90 self.conn.execute_batch(
91 "CREATE TABLE IF NOT EXISTS events (
92 id INTEGER PRIMARY KEY AUTOINCREMENT,
93 timestamp TEXT NOT NULL,
94 command TEXT NOT NULL,
95 duration_ms INTEGER NOT NULL,
96 exit_code INTEGER NOT NULL,
97 error_count INTEGER DEFAULT 0,
98 warning_count INTEGER DEFAULT 0,
99 node_count INTEGER DEFAULT 0,
100 edge_count INTEGER DEFAULT 0,
101 language_mix TEXT DEFAULT '{}',
102 resolution_tiers TEXT DEFAULT '{}',
103 circuit_breaker_events INTEGER DEFAULT 0,
104 error_codes TEXT DEFAULT '{}',
105 client_name TEXT
106 );
107 CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
108 CREATE INDEX IF NOT EXISTS idx_events_command ON events(command);",
109 )?;
110 let _ = self
112 .conn
113 .execute_batch("ALTER TABLE events ADD COLUMN error_codes TEXT DEFAULT '{}'");
114 let _ = self
115 .conn
116 .execute_batch("ALTER TABLE events ADD COLUMN client_name TEXT");
117 let _ = self
118 .conn
119 .execute_batch("ALTER TABLE events ADD COLUMN violations_resolved INTEGER DEFAULT 0");
120 let _ = self
121 .conn
122 .execute_batch("ALTER TABLE events ADD COLUMN violations_persisted INTEGER DEFAULT 0");
123 let _ = self
124 .conn
125 .execute_batch("ALTER TABLE events ADD COLUMN violations_new INTEGER DEFAULT 0");
126 Ok(())
127 }
128
129 pub fn record(&self, event: &TelemetryEvent) -> Result<(), GraphError> {
131 let lang_json = serde_json::to_string(&event.language_mix).unwrap_or_default();
132 let tier_json = serde_json::to_string(&event.resolution_tiers).unwrap_or_default();
133 let codes_json = serde_json::to_string(&event.error_codes).unwrap_or_default();
134 self.conn.execute(
135 "INSERT INTO events (timestamp, command, duration_ms, exit_code,
136 error_count, warning_count, node_count, edge_count,
137 language_mix, resolution_tiers, circuit_breaker_events,
138 error_codes, client_name,
139 violations_resolved, violations_persisted, violations_new)
140 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
141 params![
142 event.timestamp,
143 event.command,
144 event.duration_ms,
145 event.exit_code,
146 event.error_count,
147 event.warning_count,
148 event.node_count,
149 event.edge_count,
150 lang_json,
151 tier_json,
152 event.circuit_breaker_events,
153 codes_json,
154 event.client_name,
155 event.violations_resolved,
156 event.violations_persisted,
157 event.violations_new,
158 ],
159 )?;
160 Ok(())
161 }
162
163 pub fn recent_events(&self, limit: u32) -> Result<Vec<TelemetryEvent>, GraphError> {
165 let mut stmt = self.conn.prepare(
166 "SELECT id, timestamp, command, duration_ms, exit_code,
167 error_count, warning_count, node_count, edge_count,
168 language_mix, resolution_tiers, circuit_breaker_events,
169 error_codes, client_name,
170 violations_resolved, violations_persisted, violations_new
171 FROM events ORDER BY id DESC LIMIT ?1",
172 )?;
173 let rows = stmt.query_map(params![limit], |row| {
174 let lang_str: String = row.get(9)?;
175 let tier_str: String = row.get(10)?;
176 let codes_str: String = row.get::<_, Option<String>>(12)?.unwrap_or_default();
177 Ok(TelemetryEvent {
178 id: Some(row.get(0)?),
179 timestamp: row.get(1)?,
180 command: row.get(2)?,
181 duration_ms: row.get(3)?,
182 exit_code: row.get(4)?,
183 error_count: row.get(5)?,
184 warning_count: row.get(6)?,
185 node_count: row.get(7)?,
186 edge_count: row.get(8)?,
187 language_mix: serde_json::from_str(&lang_str).unwrap_or_default(),
188 resolution_tiers: serde_json::from_str(&tier_str).unwrap_or_default(),
189 circuit_breaker_events: row.get(11)?,
190 error_codes: serde_json::from_str(&codes_str).unwrap_or_default(),
191 client_name: row.get(13)?,
192 violations_resolved: row.get(14)?,
193 violations_persisted: row.get(15)?,
194 violations_new: row.get(16)?,
195 })
196 })?;
197 let mut events = Vec::new();
198 for row in rows {
199 events.push(row?);
200 }
201 Ok(events)
202 }
203
204 pub fn aggregate(&self, days: u32) -> Result<TelemetryAggregate, GraphError> {
206 let cutoff = format!("datetime('now', '-{} days')", days);
207
208 let total: u64 = self.conn.query_row(
209 &format!("SELECT COUNT(*) FROM events WHERE timestamp >= {cutoff}"),
210 [],
211 |r| r.get(0),
212 )?;
213
214 let total_errors: u64 = self.conn.query_row(
215 &format!(
216 "SELECT COALESCE(SUM(error_count), 0) FROM events WHERE timestamp >= {cutoff}"
217 ),
218 [],
219 |r| r.get(0),
220 )?;
221
222 let total_warnings: u64 = self.conn.query_row(
223 &format!(
224 "SELECT COALESCE(SUM(warning_count), 0) FROM events WHERE timestamp >= {cutoff}"
225 ),
226 [],
227 |r| r.get(0),
228 )?;
229
230 let avg_compile: Option<f64> = self.conn.query_row(
231 &format!(
232 "SELECT AVG(duration_ms) FROM events WHERE command = 'compile' AND timestamp >= {cutoff}"
233 ),
234 [],
235 |r| r.get(0),
236 )?;
237
238 let avg_map: Option<f64> = self.conn.query_row(
239 &format!(
240 "SELECT AVG(duration_ms) FROM events WHERE command = 'map' AND timestamp >= {cutoff}"
241 ),
242 [],
243 |r| r.get(0),
244 )?;
245
246 let mut cmd_stmt = self.conn.prepare(
248 &format!(
249 "SELECT command, COUNT(*) FROM events WHERE timestamp >= {cutoff} GROUP BY command ORDER BY COUNT(*) DESC"
250 ),
251 )?;
252 let mut command_counts = HashMap::new();
253 let cmd_rows = cmd_stmt.query_map([], |row| {
254 Ok((row.get::<_, String>(0)?, row.get::<_, u64>(1)?))
255 })?;
256 for row in cmd_rows {
257 let (cmd, count) = row?;
258 command_counts.insert(cmd, count);
259 }
260
261 let language_percentages: HashMap<String, f64> = {
265 let mut lang_stmt = self.conn.prepare(&format!(
266 "SELECT language_mix FROM events \
267 WHERE timestamp >= {cutoff} AND command = 'map' AND language_mix != '{{}}' \
268 ORDER BY id DESC LIMIT 1"
269 ))?;
270 let result: Option<String> =
271 lang_stmt.query_row([], |row| row.get::<_, String>(0)).ok();
272
273 let json_str = match result {
275 Some(s) => s,
276 None => {
277 let mut fallback = self.conn.prepare(&format!(
278 "SELECT language_mix FROM events \
279 WHERE timestamp >= {cutoff} AND language_mix != '{{}}' \
280 ORDER BY id DESC LIMIT 1"
281 ))?;
282 fallback
283 .query_row([], |row| row.get::<_, String>(0))
284 .unwrap_or_default()
285 }
286 };
287
288 if let Ok(map) = serde_json::from_str::<HashMap<String, u32>>(&json_str) {
289 map.into_iter().map(|(k, v)| (k, v as f64)).collect()
290 } else {
291 HashMap::new()
292 }
293 };
294
295 let mut codes_stmt = self.conn.prepare(&format!(
297 "SELECT error_codes FROM events WHERE timestamp >= {cutoff}"
298 ))?;
299 let mut top_error_codes: HashMap<String, u64> = HashMap::new();
300 let codes_rows = codes_stmt.query_map([], |row| row.get::<_, Option<String>>(0))?;
301 for row in codes_rows {
302 if let Some(json_str) = row? {
303 if let Ok(map) = serde_json::from_str::<HashMap<String, u32>>(&json_str) {
304 for (code, count) in map {
305 *top_error_codes.entry(code).or_default() += count as u64;
306 }
307 }
308 }
309 }
310
311 let mut agent_stats: HashMap<String, AgentStats> = HashMap::new();
313 let mut agent_stmt = self.conn.prepare(&format!(
314 "SELECT command, client_name, node_count FROM events WHERE client_name IS NOT NULL AND timestamp >= {cutoff}"
315 ))?;
316 let agent_rows = agent_stmt.query_map([], |row| {
317 Ok((
318 row.get::<_, String>(0)?,
319 row.get::<_, String>(1)?,
320 row.get::<_, u32>(2)?,
321 ))
322 })?;
323 for row in agent_rows {
324 let (command, client, node_count) = row?;
325 let stats = agent_stats.entry(client).or_default();
326 if command == "mcp:session" {
327 stats.sessions += 1;
328 stats.total_tool_calls += node_count as u64;
329 } else if command.starts_with("mcp:") {
330 *stats.tool_usage.entry(command).or_default() += 1;
331 }
332 }
333 for stats in agent_stats.values_mut() {
335 if stats.sessions > 0 {
336 stats.avg_tool_calls_per_session =
337 stats.total_tool_calls as f64 / stats.sessions as f64;
338 }
339 }
340
341 Ok(TelemetryAggregate {
342 total_invocations: total,
343 avg_compile_ms: avg_compile,
344 avg_map_ms: avg_map,
345 total_errors,
346 total_warnings,
347 command_counts,
348 language_percentages,
349 top_error_codes,
350 agent_stats,
351 })
352 }
353
354 pub fn prune(&self, days: u32) -> Result<u64, GraphError> {
356 let deleted = self.conn.execute(
357 &format!(
358 "DELETE FROM events WHERE timestamp < datetime('now', '-{} days')",
359 days
360 ),
361 [],
362 )?;
363 Ok(deleted as u64)
364 }
365}
366
367pub fn new_event(command: &str, duration_ms: u64, exit_code: i32) -> TelemetryEvent {
369 TelemetryEvent {
370 id: None,
371 timestamp: chrono_utc_now(),
372 command: command.to_string(),
373 duration_ms,
374 exit_code,
375 error_count: 0,
376 warning_count: 0,
377 node_count: 0,
378 edge_count: 0,
379 language_mix: HashMap::new(),
380 resolution_tiers: HashMap::new(),
381 circuit_breaker_events: 0,
382 error_codes: HashMap::new(),
383 client_name: None,
384 violations_resolved: 0,
385 violations_persisted: 0,
386 violations_new: 0,
387 }
388}
389
390fn chrono_utc_now() -> String {
392 let now = std::time::SystemTime::now();
394 let duration = now
395 .duration_since(std::time::UNIX_EPOCH)
396 .unwrap_or_default();
397 let secs = duration.as_secs();
398 let days_since_epoch = secs / 86400;
399 let time_of_day = secs % 86400;
400 let hours = time_of_day / 3600;
401 let minutes = (time_of_day % 3600) / 60;
402 let seconds = time_of_day % 60;
403
404 let (year, month, day) = days_to_ymd(days_since_epoch);
406 format!(
407 "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
408 year, month, day, hours, minutes, seconds
409 )
410}
411
412fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
413 let mut year = 1970;
415 loop {
416 let days_in_year = if is_leap(year) { 366 } else { 365 };
417 if days < days_in_year {
418 break;
419 }
420 days -= days_in_year;
421 year += 1;
422 }
423 let months: [u64; 12] = if is_leap(year) {
424 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
425 } else {
426 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
427 };
428 let mut month = 1;
429 for &m in &months {
430 if days < m {
431 break;
432 }
433 days -= m;
434 month += 1;
435 }
436 (year, month, days + 1)
437}
438
439fn is_leap(year: u64) -> bool {
440 (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
441}
442
443#[cfg(test)]
444#[path = "telemetry_tests.rs"]
445mod tests;