Skip to main content

keel_core/
telemetry.rs

1//! Privacy-safe telemetry engine for keel.
2//!
3//! Stores aggregate command metrics in a separate `telemetry.db` SQLite database.
4//! **By design**, no fields exist for file paths, function names, source code,
5//! git history, or any user-identifiable information.
6
7use std::collections::HashMap;
8use std::path::Path;
9
10use rusqlite::{params, Connection};
11
12use crate::types::GraphError;
13
14/// A single telemetry event recorded after a command completes.
15#[derive(Debug, Clone, serde::Serialize)]
16pub struct TelemetryEvent {
17    pub id: Option<i64>,
18    pub timestamp: String,
19    pub command: String,
20    pub duration_ms: u64,
21    pub exit_code: i32,
22    pub error_count: u32,
23    pub warning_count: u32,
24    /// Graph node count for CLI commands; repurposed as tool_call_count for MCP session events.
25    pub node_count: u32,
26    pub edge_count: u32,
27    pub language_mix: HashMap<String, u32>,
28    pub resolution_tiers: HashMap<String, u32>,
29    pub circuit_breaker_events: u32,
30    pub error_codes: HashMap<String, u32>,
31    pub client_name: Option<String>,
32    /// Violations from previous compile that are now gone (agent acted on keel advice).
33    pub violations_resolved: u32,
34    /// Violations from previous compile that persist (agent ignored keel).
35    pub violations_persisted: u32,
36    /// New violations not present in previous compile.
37    pub violations_new: u32,
38}
39
40/// Per-agent adoption metrics.
41#[derive(Debug, Clone, Default, serde::Serialize)]
42pub struct AgentStats {
43    pub sessions: u64,
44    pub total_tool_calls: u64,
45    pub avg_tool_calls_per_session: f64,
46    pub tool_usage: HashMap<String, u64>,
47}
48
49/// Aggregated telemetry over a time window.
50#[derive(Debug, Clone, serde::Serialize)]
51pub struct TelemetryAggregate {
52    pub total_invocations: u64,
53    pub avg_compile_ms: Option<f64>,
54    pub avg_map_ms: Option<f64>,
55    pub total_errors: u64,
56    pub total_warnings: u64,
57    pub command_counts: HashMap<String, u64>,
58    pub language_percentages: HashMap<String, f64>,
59    pub top_error_codes: HashMap<String, u64>,
60    pub agent_stats: HashMap<String, AgentStats>,
61}
62
63/// SQLite-backed telemetry store (separate from graph.db).
64pub struct TelemetryStore {
65    conn: Connection,
66}
67
68impl TelemetryStore {
69    /// Open or create `telemetry.db` at the given path.
70    pub fn open(path: &Path) -> Result<Self, GraphError> {
71        let conn = Connection::open(path)?;
72        conn.execute_batch(
73            "PRAGMA journal_mode = WAL;
74             PRAGMA synchronous = NORMAL;",
75        )?;
76        let store = Self { conn };
77        store.initialize_schema()?;
78        Ok(store)
79    }
80
81    /// Create an in-memory telemetry store (for testing).
82    pub fn in_memory() -> Result<Self, GraphError> {
83        let conn = Connection::open_in_memory()?;
84        let store = Self { conn };
85        store.initialize_schema()?;
86        Ok(store)
87    }
88
89    fn initialize_schema(&self) -> Result<(), GraphError> {
90        self.conn.execute_batch(
91            "CREATE TABLE IF NOT EXISTS events (
92                id INTEGER PRIMARY KEY AUTOINCREMENT,
93                timestamp TEXT NOT NULL,
94                command TEXT NOT NULL,
95                duration_ms INTEGER NOT NULL,
96                exit_code INTEGER NOT NULL,
97                error_count INTEGER DEFAULT 0,
98                warning_count INTEGER DEFAULT 0,
99                node_count INTEGER DEFAULT 0,
100                edge_count INTEGER DEFAULT 0,
101                language_mix TEXT DEFAULT '{}',
102                resolution_tiers TEXT DEFAULT '{}',
103                circuit_breaker_events INTEGER DEFAULT 0,
104                error_codes TEXT DEFAULT '{}',
105                client_name TEXT
106            );
107            CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp);
108            CREATE INDEX IF NOT EXISTS idx_events_command ON events(command);",
109        )?;
110        // Migrate existing tables that lack the new columns
111        let _ = self
112            .conn
113            .execute_batch("ALTER TABLE events ADD COLUMN error_codes TEXT DEFAULT '{}'");
114        let _ = self
115            .conn
116            .execute_batch("ALTER TABLE events ADD COLUMN client_name TEXT");
117        let _ = self
118            .conn
119            .execute_batch("ALTER TABLE events ADD COLUMN violations_resolved INTEGER DEFAULT 0");
120        let _ = self
121            .conn
122            .execute_batch("ALTER TABLE events ADD COLUMN violations_persisted INTEGER DEFAULT 0");
123        let _ = self
124            .conn
125            .execute_batch("ALTER TABLE events ADD COLUMN violations_new INTEGER DEFAULT 0");
126        Ok(())
127    }
128
129    /// Record a single telemetry event.
130    pub fn record(&self, event: &TelemetryEvent) -> Result<(), GraphError> {
131        let lang_json = serde_json::to_string(&event.language_mix).unwrap_or_default();
132        let tier_json = serde_json::to_string(&event.resolution_tiers).unwrap_or_default();
133        let codes_json = serde_json::to_string(&event.error_codes).unwrap_or_default();
134        self.conn.execute(
135            "INSERT INTO events (timestamp, command, duration_ms, exit_code,
136             error_count, warning_count, node_count, edge_count,
137             language_mix, resolution_tiers, circuit_breaker_events,
138             error_codes, client_name,
139             violations_resolved, violations_persisted, violations_new)
140             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
141            params![
142                event.timestamp,
143                event.command,
144                event.duration_ms,
145                event.exit_code,
146                event.error_count,
147                event.warning_count,
148                event.node_count,
149                event.edge_count,
150                lang_json,
151                tier_json,
152                event.circuit_breaker_events,
153                codes_json,
154                event.client_name,
155                event.violations_resolved,
156                event.violations_persisted,
157                event.violations_new,
158            ],
159        )?;
160        Ok(())
161    }
162
163    /// Retrieve recent events, most recent first.
164    pub fn recent_events(&self, limit: u32) -> Result<Vec<TelemetryEvent>, GraphError> {
165        let mut stmt = self.conn.prepare(
166            "SELECT id, timestamp, command, duration_ms, exit_code,
167                    error_count, warning_count, node_count, edge_count,
168                    language_mix, resolution_tiers, circuit_breaker_events,
169                    error_codes, client_name,
170                    violations_resolved, violations_persisted, violations_new
171             FROM events ORDER BY id DESC LIMIT ?1",
172        )?;
173        let rows = stmt.query_map(params![limit], |row| {
174            let lang_str: String = row.get(9)?;
175            let tier_str: String = row.get(10)?;
176            let codes_str: String = row.get::<_, Option<String>>(12)?.unwrap_or_default();
177            Ok(TelemetryEvent {
178                id: Some(row.get(0)?),
179                timestamp: row.get(1)?,
180                command: row.get(2)?,
181                duration_ms: row.get(3)?,
182                exit_code: row.get(4)?,
183                error_count: row.get(5)?,
184                warning_count: row.get(6)?,
185                node_count: row.get(7)?,
186                edge_count: row.get(8)?,
187                language_mix: serde_json::from_str(&lang_str).unwrap_or_default(),
188                resolution_tiers: serde_json::from_str(&tier_str).unwrap_or_default(),
189                circuit_breaker_events: row.get(11)?,
190                error_codes: serde_json::from_str(&codes_str).unwrap_or_default(),
191                client_name: row.get(13)?,
192                violations_resolved: row.get(14)?,
193                violations_persisted: row.get(15)?,
194                violations_new: row.get(16)?,
195            })
196        })?;
197        let mut events = Vec::new();
198        for row in rows {
199            events.push(row?);
200        }
201        Ok(events)
202    }
203
204    /// Aggregate telemetry over the last N days.
205    pub fn aggregate(&self, days: u32) -> Result<TelemetryAggregate, GraphError> {
206        let cutoff = format!("datetime('now', '-{} days')", days);
207
208        let total: u64 = self.conn.query_row(
209            &format!("SELECT COUNT(*) FROM events WHERE timestamp >= {cutoff}"),
210            [],
211            |r| r.get(0),
212        )?;
213
214        let total_errors: u64 = self.conn.query_row(
215            &format!(
216                "SELECT COALESCE(SUM(error_count), 0) FROM events WHERE timestamp >= {cutoff}"
217            ),
218            [],
219            |r| r.get(0),
220        )?;
221
222        let total_warnings: u64 = self.conn.query_row(
223            &format!(
224                "SELECT COALESCE(SUM(warning_count), 0) FROM events WHERE timestamp >= {cutoff}"
225            ),
226            [],
227            |r| r.get(0),
228        )?;
229
230        let avg_compile: Option<f64> = self.conn.query_row(
231            &format!(
232                "SELECT AVG(duration_ms) FROM events WHERE command = 'compile' AND timestamp >= {cutoff}"
233            ),
234            [],
235            |r| r.get(0),
236        )?;
237
238        let avg_map: Option<f64> = self.conn.query_row(
239            &format!(
240                "SELECT AVG(duration_ms) FROM events WHERE command = 'map' AND timestamp >= {cutoff}"
241            ),
242            [],
243            |r| r.get(0),
244        )?;
245
246        // Command counts
247        let mut cmd_stmt = self.conn.prepare(
248            &format!(
249                "SELECT command, COUNT(*) FROM events WHERE timestamp >= {cutoff} GROUP BY command ORDER BY COUNT(*) DESC"
250            ),
251        )?;
252        let mut command_counts = HashMap::new();
253        let cmd_rows = cmd_stmt.query_map([], |row| {
254            Ok((row.get::<_, String>(0)?, row.get::<_, u64>(1)?))
255        })?;
256        for row in cmd_rows {
257            let (cmd, count) = row?;
258            command_counts.insert(cmd, count);
259        }
260
261        // Language percentages — use the latest `map` event's language_mix
262        // (most accurate snapshot since map scans all files).
263        // Falls back to latest event with any language_mix if no map events exist.
264        let language_percentages: HashMap<String, f64> = {
265            let mut lang_stmt = self.conn.prepare(&format!(
266                "SELECT language_mix FROM events \
267                 WHERE timestamp >= {cutoff} AND command = 'map' AND language_mix != '{{}}' \
268                 ORDER BY id DESC LIMIT 1"
269            ))?;
270            let result: Option<String> =
271                lang_stmt.query_row([], |row| row.get::<_, String>(0)).ok();
272
273            // Fallback: any event with a non-empty language_mix
274            let json_str = match result {
275                Some(s) => s,
276                None => {
277                    let mut fallback = self.conn.prepare(&format!(
278                        "SELECT language_mix FROM events \
279                         WHERE timestamp >= {cutoff} AND language_mix != '{{}}' \
280                         ORDER BY id DESC LIMIT 1"
281                    ))?;
282                    fallback
283                        .query_row([], |row| row.get::<_, String>(0))
284                        .unwrap_or_default()
285                }
286            };
287
288            if let Ok(map) = serde_json::from_str::<HashMap<String, u32>>(&json_str) {
289                map.into_iter().map(|(k, v)| (k, v as f64)).collect()
290            } else {
291                HashMap::new()
292            }
293        };
294
295        // Error code aggregation
296        let mut codes_stmt = self.conn.prepare(&format!(
297            "SELECT error_codes FROM events WHERE timestamp >= {cutoff}"
298        ))?;
299        let mut top_error_codes: HashMap<String, u64> = HashMap::new();
300        let codes_rows = codes_stmt.query_map([], |row| row.get::<_, Option<String>>(0))?;
301        for row in codes_rows {
302            if let Some(json_str) = row? {
303                if let Ok(map) = serde_json::from_str::<HashMap<String, u32>>(&json_str) {
304                    for (code, count) in map {
305                        *top_error_codes.entry(code).or_default() += count as u64;
306                    }
307                }
308            }
309        }
310
311        // Agent stats aggregation
312        let mut agent_stats: HashMap<String, AgentStats> = HashMap::new();
313        let mut agent_stmt = self.conn.prepare(&format!(
314            "SELECT command, client_name, node_count FROM events WHERE client_name IS NOT NULL AND timestamp >= {cutoff}"
315        ))?;
316        let agent_rows = agent_stmt.query_map([], |row| {
317            Ok((
318                row.get::<_, String>(0)?,
319                row.get::<_, String>(1)?,
320                row.get::<_, u32>(2)?,
321            ))
322        })?;
323        for row in agent_rows {
324            let (command, client, node_count) = row?;
325            let stats = agent_stats.entry(client).or_default();
326            if command == "mcp:session" {
327                stats.sessions += 1;
328                stats.total_tool_calls += node_count as u64;
329            } else if command.starts_with("mcp:") {
330                *stats.tool_usage.entry(command).or_default() += 1;
331            }
332        }
333        // Compute averages
334        for stats in agent_stats.values_mut() {
335            if stats.sessions > 0 {
336                stats.avg_tool_calls_per_session =
337                    stats.total_tool_calls as f64 / stats.sessions as f64;
338            }
339        }
340
341        Ok(TelemetryAggregate {
342            total_invocations: total,
343            avg_compile_ms: avg_compile,
344            avg_map_ms: avg_map,
345            total_errors,
346            total_warnings,
347            command_counts,
348            language_percentages,
349            top_error_codes,
350            agent_stats,
351        })
352    }
353
354    /// Delete events older than N days.
355    pub fn prune(&self, days: u32) -> Result<u64, GraphError> {
356        let deleted = self.conn.execute(
357            &format!(
358                "DELETE FROM events WHERE timestamp < datetime('now', '-{} days')",
359                days
360            ),
361            [],
362        )?;
363        Ok(deleted as u64)
364    }
365}
366
367/// Create a new `TelemetryEvent` with the current UTC timestamp.
368pub fn new_event(command: &str, duration_ms: u64, exit_code: i32) -> TelemetryEvent {
369    TelemetryEvent {
370        id: None,
371        timestamp: chrono_utc_now(),
372        command: command.to_string(),
373        duration_ms,
374        exit_code,
375        error_count: 0,
376        warning_count: 0,
377        node_count: 0,
378        edge_count: 0,
379        language_mix: HashMap::new(),
380        resolution_tiers: HashMap::new(),
381        circuit_breaker_events: 0,
382        error_codes: HashMap::new(),
383        client_name: None,
384        violations_resolved: 0,
385        violations_persisted: 0,
386        violations_new: 0,
387    }
388}
389
390/// UTC timestamp in SQLite native format (`YYYY-MM-DD HH:MM:SS`).
391fn chrono_utc_now() -> String {
392    // Use SystemTime for a dependency-free UTC timestamp
393    let now = std::time::SystemTime::now();
394    let duration = now
395        .duration_since(std::time::UNIX_EPOCH)
396        .unwrap_or_default();
397    let secs = duration.as_secs();
398    let days_since_epoch = secs / 86400;
399    let time_of_day = secs % 86400;
400    let hours = time_of_day / 3600;
401    let minutes = (time_of_day % 3600) / 60;
402    let seconds = time_of_day % 60;
403
404    // Calculate date from days since epoch (1970-01-01)
405    let (year, month, day) = days_to_ymd(days_since_epoch);
406    format!(
407        "{:04}-{:02}-{:02} {:02}:{:02}:{:02}",
408        year, month, day, hours, minutes, seconds
409    )
410}
411
412fn days_to_ymd(mut days: u64) -> (u64, u64, u64) {
413    // Simplified Gregorian calendar conversion
414    let mut year = 1970;
415    loop {
416        let days_in_year = if is_leap(year) { 366 } else { 365 };
417        if days < days_in_year {
418            break;
419        }
420        days -= days_in_year;
421        year += 1;
422    }
423    let months: [u64; 12] = if is_leap(year) {
424        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
425    } else {
426        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
427    };
428    let mut month = 1;
429    for &m in &months {
430        if days < m {
431            break;
432        }
433        days -= m;
434        month += 1;
435    }
436    (year, month, days + 1)
437}
438
439fn is_leap(year: u64) -> bool {
440    (year.is_multiple_of(4) && !year.is_multiple_of(100)) || year.is_multiple_of(400)
441}
442
443#[cfg(test)]
444#[path = "telemetry_tests.rs"]
445mod tests;