1use crate::telemetry::{
6 schema::SCHEMA_SQL, AggregatedMetrics, CommunityExport, FeedbackEvent, FeedbackSummary,
7 QueryEvent, TelemetryError, TelemetryResult, TimeRange, ToolUsageMetric, TraceEvent,
8 TELEMETRY_SCHEMA_VERSION,
9};
10use chrono::{DateTime, Utc};
11use rusqlite::{params, Connection};
12use sha2::{Digest, Sha256};
13use std::path::Path;
14use std::sync::Mutex;
15use uuid::Uuid;
16
17pub struct TelemetryStorage {
19 conn: Option<Mutex<Connection>>,
21 db_path: String,
23 is_noop: bool,
25}
26
27impl TelemetryStorage {
28 pub async fn new(db_path: &Path) -> TelemetryResult<Self> {
30 let path_str = db_path.to_string_lossy().to_string();
31
32 if let Some(parent) = db_path.parent() {
34 if !parent.exists() {
35 std::fs::create_dir_all(parent).map_err(|e| {
36 TelemetryError::Io(std::io::Error::other(format!(
37 "Failed to create telemetry directory {:?}: {}",
38 parent, e
39 )))
40 })?;
41 tracing::info!(path = ?parent, "Created telemetry data directory");
42 }
43 }
44
45 let conn =
47 Connection::open(db_path).map_err(|e| TelemetryError::Database(e.to_string()))?;
48
49 Self::migrate_schema(&conn)?;
51
52 tracing::info!(
53 path = %path_str,
54 schema_version = TELEMETRY_SCHEMA_VERSION,
55 "Initialized telemetry database"
56 );
57
58 Ok(Self {
59 conn: Some(Mutex::new(conn)),
60 db_path: path_str,
61 is_noop: false,
62 })
63 }
64
65 pub async fn initialize_default() -> TelemetryResult<Self> {
70 use crate::telemetry::TelemetryConfig;
71 let db_path = TelemetryConfig::default_db_path();
72 Self::new(&db_path).await
73 }
74
75 fn migrate_schema(conn: &Connection) -> TelemetryResult<()> {
77 let has_version_table: bool = conn
79 .query_row(
80 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
81 [],
82 |row| row.get::<_, i64>(0).map(|c| c > 0),
83 )
84 .unwrap_or(false);
85
86 if !has_version_table {
87 conn.execute_batch(SCHEMA_SQL).map_err(|e| {
89 TelemetryError::Database(format!("Failed to initialize schema: {}", e))
90 })?;
91
92 conn.execute(
94 "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY, applied_at TEXT)",
95 [],
96 )
97 .map_err(|e| TelemetryError::Database(e.to_string()))?;
98
99 conn.execute(
101 "INSERT OR REPLACE INTO schema_version (version, applied_at) VALUES (?1, datetime('now'))",
102 params![TELEMETRY_SCHEMA_VERSION as i64],
103 )
104 .map_err(|e| TelemetryError::Database(e.to_string()))?;
105
106 tracing::info!(
107 version = TELEMETRY_SCHEMA_VERSION,
108 "Initialized fresh telemetry schema"
109 );
110 } else {
111 let current_version: i64 = conn
113 .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
114 row.get(0)
115 })
116 .unwrap_or(0);
117
118 if current_version < TELEMETRY_SCHEMA_VERSION as i64 {
119 tracing::info!(
122 from = current_version,
123 to = TELEMETRY_SCHEMA_VERSION,
124 "Migrating telemetry schema"
125 );
126
127 conn.execute(
129 "INSERT OR REPLACE INTO schema_version (version, applied_at) VALUES (?1, datetime('now'))",
130 params![TELEMETRY_SCHEMA_VERSION as i64],
131 )
132 .map_err(|e| TelemetryError::Database(e.to_string()))?;
133 }
134 }
136
137 Ok(())
138 }
139
140 pub fn in_memory() -> TelemetryResult<Self> {
142 let conn =
143 Connection::open_in_memory().map_err(|e| TelemetryError::Database(e.to_string()))?;
144
145 Self::migrate_schema(&conn)?;
147
148 Ok(Self {
149 conn: Some(Mutex::new(conn)),
150 db_path: ":memory:".to_string(),
151 is_noop: false,
152 })
153 }
154
155 pub fn schema_version(&self) -> TelemetryResult<u32> {
157 if self.is_noop {
158 return Ok(0);
159 }
160
161 let conn = self.get_conn()?;
162 let version: i64 = conn
163 .query_row("SELECT MAX(version) FROM schema_version", [], |row| {
164 row.get(0)
165 })
166 .unwrap_or(0);
167
168 Ok(version as u32)
169 }
170
171 pub fn db_path(&self) -> &str {
173 &self.db_path
174 }
175
176 pub fn noop() -> Self {
178 Self {
179 conn: None,
180 db_path: String::new(),
181 is_noop: true,
182 }
183 }
184
185 fn get_conn(&self) -> TelemetryResult<std::sync::MutexGuard<'_, Connection>> {
187 self.conn
188 .as_ref()
189 .ok_or(TelemetryError::Disabled)?
190 .lock()
191 .map_err(|e| TelemetryError::Database(format!("Lock poisoned: {}", e)))
192 }
193
194 pub async fn insert_session(&mut self, session_id: Uuid) -> TelemetryResult<()> {
196 if self.is_noop {
197 return Ok(());
198 }
199
200 let conn = self.get_conn()?;
201 let now = Utc::now();
202
203 conn.execute(
204 r#"INSERT OR IGNORE INTO sessions (
205 id, started_at, client_version
206 ) VALUES (?1, ?2, ?3)"#,
207 params![
208 session_id.to_string(),
209 now.to_rfc3339(),
210 env!("CARGO_PKG_VERSION"),
211 ],
212 )
213 .map_err(|e| TelemetryError::Database(e.to_string()))?;
214
215 tracing::debug!(
216 session_id = %session_id,
217 "Created telemetry session"
218 );
219
220 Ok(())
221 }
222
223 fn hash_query(query: &str) -> String {
225 let normalized = query
226 .to_lowercase()
227 .split_whitespace()
228 .collect::<Vec<_>>()
229 .join(" ");
230
231 let mut hasher = Sha256::new();
232 hasher.update(normalized.as_bytes());
233 format!("{:x}", hasher.finalize())
234 }
235
236 pub async fn insert_query_event(&mut self, event: &QueryEvent) -> TelemetryResult<()> {
238 if self.is_noop {
239 return Ok(());
240 }
241
242 let conn = self.get_conn()?;
243 let query_hash = Self::hash_query(&event.query_text);
244 let tools_json = serde_json::to_string(&event.tools_used).unwrap_or_default();
245
246 conn.execute(
247 r#"INSERT INTO queries (
248 id, session_id, timestamp, query_hash, query_length,
249 query_token_count, query_type, latency_ms, tool_calls,
250 retrieval_count, result_count, result_quality_score,
251 error_occurred, error_category, profile_used, tools_used
252 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)"#,
253 params![
254 event.id.to_string(),
255 event.session_id.to_string(),
256 event.timestamp.to_rfc3339(),
257 query_hash,
258 event.query_text.len() as i64,
259 None::<i64>, format!("{:?}", event.query_type).to_lowercase(),
261 event.latency_ms as i64,
262 event.tool_calls as i64,
263 event.retrieval_count as i64,
264 event.result_count as i64,
265 event.quality_score,
266 event.error.is_some() as i64,
267 event
268 .error
269 .as_ref()
270 .map(|e| format!("{:?}", e.category).to_lowercase()),
271 event.profile.as_deref(),
272 tools_json,
273 ],
274 )
275 .map_err(|e| TelemetryError::Database(e.to_string()))?;
276
277 tracing::debug!(
278 event_id = %event.id,
279 session_id = %event.session_id,
280 query_type = ?event.query_type,
281 latency_ms = event.latency_ms,
282 "Recorded query event"
283 );
284
285 Ok(())
286 }
287
288 pub async fn insert_feedback_event(&mut self, event: &FeedbackEvent) -> TelemetryResult<()> {
290 if self.is_noop {
291 return Ok(());
292 }
293
294 let conn = self.get_conn()?;
295
296 conn.execute(
297 r#"INSERT INTO feedback (
298 id, session_id, query_id, timestamp,
299 feedback_type, rating, category, context_hash
300 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"#,
301 params![
302 event.id.to_string(),
303 event.session_id.to_string(),
304 event.query_id.map(|id| id.to_string()),
305 event.timestamp.to_rfc3339(),
306 format!("{:?}", event.feedback_type).to_lowercase(),
307 event.rating.map(|r| r as i64),
308 event
309 .category
310 .as_ref()
311 .map(|c| format!("{:?}", c).to_lowercase()),
312 event.context_hash.as_deref(),
313 ],
314 )
315 .map_err(|e| TelemetryError::Database(e.to_string()))?;
316
317 tracing::debug!(
318 event_id = %event.id,
319 session_id = %event.session_id,
320 feedback_type = ?event.feedback_type,
321 "Recorded feedback event"
322 );
323
324 Ok(())
325 }
326
327 pub async fn insert_trace_event(&mut self, event: &TraceEvent) -> TelemetryResult<()> {
329 if self.is_noop {
330 return Ok(());
331 }
332
333 let conn = self.get_conn()?;
334 let step_types_json = serde_json::to_string(&event.step_types).unwrap_or_default();
335
336 conn.execute(
337 r#"INSERT INTO reasoning_traces (
338 id, session_id, query_id, timestamp,
339 thinktool_name, step_count, total_ms, avg_step_ms,
340 coherence_score, depth_score, step_types
341 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"#,
342 params![
343 event.id.to_string(),
344 event.session_id.to_string(),
345 event.query_id.map(|id| id.to_string()),
346 event.timestamp.to_rfc3339(),
347 event.thinktool_name,
348 event.step_count as i64,
349 event.total_ms as i64,
350 event.avg_step_ms,
351 event.coherence_score,
352 event.depth_score,
353 step_types_json,
354 ],
355 )
356 .map_err(|e| TelemetryError::Database(e.to_string()))?;
357
358 tracing::debug!(
359 event_id = %event.id,
360 session_id = %event.session_id,
361 thinktool = %event.thinktool_name,
362 steps = event.step_count,
363 "Recorded trace event"
364 );
365
366 Ok(())
367 }
368
369 pub async fn get_aggregated_metrics(&self) -> TelemetryResult<AggregatedMetrics> {
371 if self.is_noop {
372 return Err(TelemetryError::Disabled);
373 }
374
375 let conn = self.get_conn()?;
376
377 let (total_queries, avg_latency): (i64, f64) = conn.query_row(
379 "SELECT COUNT(*), COALESCE(AVG(latency_ms), 0) FROM queries WHERE timestamp > datetime('now', '-30 days')",
380 [],
381 |row| Ok((row.get(0)?, row.get(1)?))
382 ).map_err(|e| TelemetryError::Database(e.to_string()))?;
383
384 let mut tool_stmt = conn
386 .prepare(
387 r#"SELECT
388 tool_name,
389 COUNT(*) as count,
390 SUM(CASE WHEN success = 1 THEN 1 ELSE 0 END) * 1.0 / COUNT(*) as success_rate,
391 AVG(execution_ms) as avg_execution_ms
392 FROM tool_usage
393 WHERE timestamp > datetime('now', '-30 days')
394 GROUP BY tool_name
395 ORDER BY count DESC
396 LIMIT 20"#,
397 )
398 .map_err(|e| TelemetryError::Database(e.to_string()))?;
399
400 let tool_usage: Vec<ToolUsageMetric> = tool_stmt
401 .query_map([], |row| {
402 Ok(ToolUsageMetric {
403 tool: row.get(0)?,
404 count: row.get::<_, i64>(1)? as u64,
405 success_rate: row.get(2)?,
406 avg_execution_ms: row.get(3)?,
407 })
408 })
409 .map_err(|e| TelemetryError::Database(e.to_string()))?
410 .filter_map(|r| r.ok())
411 .collect();
412
413 let (total_feedback, positive_ratio): (i64, f64) = conn.query_row(
415 r#"SELECT
416 COUNT(*),
417 COALESCE(SUM(CASE WHEN feedback_type = 'thumbs_up' THEN 1 ELSE 0 END) * 1.0 / NULLIF(COUNT(*), 0), 0)
418 FROM feedback
419 WHERE timestamp > datetime('now', '-30 days')"#,
420 [],
421 |row| Ok((row.get(0)?, row.get(1)?))
422 ).map_err(|e| TelemetryError::Database(e.to_string()))?;
423
424 let (start, end): (String, String) = conn.query_row(
426 "SELECT COALESCE(MIN(timestamp), datetime('now')), COALESCE(MAX(timestamp), datetime('now')) FROM queries",
427 [],
428 |row| Ok((row.get(0)?, row.get(1)?))
429 ).map_err(|e| TelemetryError::Database(e.to_string()))?;
430
431 Ok(AggregatedMetrics {
432 total_queries: total_queries as u64,
433 avg_latency_ms: avg_latency,
434 tool_usage,
435 query_clusters: Vec::new(), feedback_summary: FeedbackSummary {
437 total_feedback: total_feedback as u64,
438 positive_ratio,
439 improvement_areas: Vec::new(),
440 },
441 time_range: TimeRange {
442 start: DateTime::parse_from_rfc3339(&format!("{}Z", start))
443 .map(|d| d.with_timezone(&Utc))
444 .unwrap_or_else(|_| Utc::now()),
445 end: DateTime::parse_from_rfc3339(&format!("{}Z", end))
446 .map(|d| d.with_timezone(&Utc))
447 .unwrap_or_else(|_| Utc::now()),
448 },
449 })
450 }
451
452 pub async fn export_anonymized(&self) -> TelemetryResult<CommunityExport> {
454 if self.is_noop {
455 return Err(TelemetryError::Disabled);
456 }
457
458 let aggregates = self.get_aggregated_metrics().await?;
459 let contributor_hash = self.generate_contributor_hash();
460
461 Ok(CommunityExport {
462 schema_version: TELEMETRY_SCHEMA_VERSION,
463 exported_at: Utc::now(),
464 aggregates,
465 dp_epsilon: 1.0,
466 contributor_hash,
467 })
468 }
469
470 fn generate_contributor_hash(&self) -> String {
472 let mut hasher = Sha256::new();
473 hasher.update(self.db_path.as_bytes());
474 hasher.update(b"reasonkit-contributor-v1");
475 format!("{:x}", hasher.finalize())[..16].to_string()
476 }
477
478 pub async fn run_daily_aggregation(&mut self, date: &str) -> TelemetryResult<()> {
480 if self.is_noop {
481 return Ok(());
482 }
483
484 let conn = self.get_conn()?;
485
486 conn.execute(
488 r#"INSERT OR REPLACE INTO daily_aggregates (
489 date, computed_at,
490 session_count, query_count, feedback_count, tool_invocations,
491 avg_latency_ms, p50_latency_ms, p95_latency_ms, p99_latency_ms,
492 avg_success_rate, positive_feedback_ratio, error_rate,
493 tool_distribution, query_type_distribution
494 )
495 SELECT
496 ?1 as date,
497 datetime('now') as computed_at,
498 COUNT(DISTINCT session_id) as session_count,
499 COUNT(*) as query_count,
500 (SELECT COUNT(*) FROM feedback WHERE date(timestamp) = ?1) as feedback_count,
501 SUM(tool_calls) as tool_invocations,
502 AVG(latency_ms) as avg_latency_ms,
503 AVG(latency_ms) as p50_latency_ms,
504 AVG(latency_ms) as p95_latency_ms,
505 AVG(latency_ms) as p99_latency_ms,
506 1.0 - (SUM(error_occurred) * 1.0 / NULLIF(COUNT(*), 0)) as avg_success_rate,
507 (SELECT SUM(CASE WHEN feedback_type = 'thumbs_up' THEN 1 ELSE 0 END) * 1.0 / NULLIF(COUNT(*), 0)
508 FROM feedback WHERE date(timestamp) = ?1) as positive_feedback_ratio,
509 SUM(error_occurred) * 1.0 / NULLIF(COUNT(*), 0) as error_rate,
510 '{}' as tool_distribution,
511 '{}' as query_type_distribution
512 FROM queries
513 WHERE date(timestamp) = ?1"#,
514 params![date],
515 ).map_err(|e| TelemetryError::Database(e.to_string()))?;
516
517 tracing::info!(date = %date, "Ran daily aggregation");
518
519 Ok(())
520 }
521
522 pub async fn get_db_size(&self) -> TelemetryResult<u64> {
524 if self.is_noop {
525 return Ok(0);
526 }
527
528 let conn = self.get_conn()?;
529
530 let size: i64 = conn
531 .query_row(
532 "SELECT page_count * page_size FROM pragma_page_count(), pragma_page_size()",
533 [],
534 |row| row.get(0),
535 )
536 .unwrap_or(0);
537
538 Ok(size as u64)
539 }
540
541 pub async fn prune_old_data(&mut self, retention_days: u32) -> TelemetryResult<u64> {
543 if self.is_noop {
544 return Ok(0);
545 }
546
547 let conn = self.get_conn()?;
548 let cutoff = format!("-{} days", retention_days);
549
550 let mut total_deleted = 0u64;
551
552 let deleted = conn
554 .execute(
555 "DELETE FROM queries WHERE timestamp < datetime('now', ?1)",
556 params![cutoff],
557 )
558 .map_err(|e| TelemetryError::Database(e.to_string()))?;
559 total_deleted += deleted as u64;
560
561 let deleted = conn
563 .execute(
564 "DELETE FROM feedback WHERE timestamp < datetime('now', ?1)",
565 params![cutoff],
566 )
567 .map_err(|e| TelemetryError::Database(e.to_string()))?;
568 total_deleted += deleted as u64;
569
570 let deleted = conn
572 .execute(
573 "DELETE FROM reasoning_traces WHERE timestamp < datetime('now', ?1)",
574 params![cutoff],
575 )
576 .map_err(|e| TelemetryError::Database(e.to_string()))?;
577 total_deleted += deleted as u64;
578
579 tracing::info!(
580 retention_days = retention_days,
581 deleted = total_deleted,
582 "Pruned old telemetry data"
583 );
584
585 Ok(total_deleted)
586 }
587
588 pub async fn vacuum(&mut self) -> TelemetryResult<()> {
590 if self.is_noop {
591 return Ok(());
592 }
593
594 let conn = self.get_conn()?;
595 conn.execute("VACUUM", [])
596 .map_err(|e| TelemetryError::Database(e.to_string()))?;
597
598 tracing::info!("Vacuumed telemetry database");
599
600 Ok(())
601 }
602
603 pub async fn get_query_count(&self) -> TelemetryResult<u64> {
605 if self.is_noop {
606 return Ok(0);
607 }
608
609 let conn = self.get_conn()?;
610 let count: i64 = conn
611 .query_row("SELECT COUNT(*) FROM queries", [], |row| row.get(0))
612 .map_err(|e| TelemetryError::Database(e.to_string()))?;
613
614 Ok(count as u64)
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use crate::telemetry::QueryType;
622 use uuid::Uuid;
623
624 #[tokio::test]
625 async fn test_in_memory_storage() {
626 let storage = TelemetryStorage::in_memory().unwrap();
627 assert!(!storage.is_noop);
628
629 let count = storage.get_query_count().await.unwrap();
630 assert_eq!(count, 0);
631 }
632
633 #[tokio::test]
634 async fn test_noop_storage() {
635 let storage = TelemetryStorage::noop();
636 assert!(storage.is_noop);
637
638 let result = storage.get_aggregated_metrics().await;
639 assert!(result.is_err());
640 }
641
642 fn create_test_session(storage: &TelemetryStorage, session_id: &Uuid) {
644 let conn = storage.conn.as_ref().unwrap().lock().unwrap();
645 conn.execute(
646 "INSERT INTO sessions (id, started_at) VALUES (?1, ?2)",
647 params![session_id.to_string(), Utc::now().to_rfc3339()],
648 )
649 .unwrap();
650 }
651
652 #[tokio::test]
653 async fn test_insert_and_query_event() {
654 let mut storage = TelemetryStorage::in_memory().unwrap();
655 let session_id = Uuid::new_v4();
656
657 create_test_session(&storage, &session_id);
659
660 let event = QueryEvent::new(session_id, "test query".to_string())
661 .with_type(QueryType::Search)
662 .with_latency(100);
663
664 storage.insert_query_event(&event).await.unwrap();
665
666 let count = storage.get_query_count().await.unwrap();
667 assert_eq!(count, 1);
668 }
669
670 #[tokio::test]
671 async fn test_insert_feedback_event() {
672 let mut storage = TelemetryStorage::in_memory().unwrap();
673 let session_id = Uuid::new_v4();
674
675 create_test_session(&storage, &session_id);
677
678 let feedback = FeedbackEvent::thumbs_up(session_id, None);
679 storage.insert_feedback_event(&feedback).await.unwrap();
680
681 let metrics = storage.get_aggregated_metrics().await.unwrap();
682 assert_eq!(metrics.feedback_summary.total_feedback, 1);
683 }
684
685 #[test]
686 fn test_query_hash_consistency() {
687 let hash1 = TelemetryStorage::hash_query("test query");
688 let hash2 = TelemetryStorage::hash_query("test query"); assert_eq!(hash1, hash2);
690
691 let hash3 = TelemetryStorage::hash_query("different query");
692 assert_ne!(hash1, hash3);
693 }
694
695 #[tokio::test]
696 async fn test_prune_old_data() {
697 let mut storage = TelemetryStorage::in_memory().unwrap();
698 let session_id = Uuid::new_v4();
699
700 create_test_session(&storage, &session_id);
702
703 let event = QueryEvent::new(session_id, "test".to_string());
705 storage.insert_query_event(&event).await.unwrap();
706
707 let _deleted = storage.prune_old_data(0).await.unwrap();
709
710 let count = storage.get_query_count().await.unwrap();
713 assert!(count <= 1);
715 }
716
717 #[test]
718 fn test_schema_version_tracking() {
719 let storage = TelemetryStorage::in_memory().unwrap();
720
721 let version = storage.schema_version().unwrap();
723 assert_eq!(version, TELEMETRY_SCHEMA_VERSION);
724 }
725
726 #[test]
727 fn test_db_path_accessor() {
728 let storage = TelemetryStorage::in_memory().unwrap();
729 assert_eq!(storage.db_path(), ":memory:");
730
731 let noop = TelemetryStorage::noop();
732 assert_eq!(noop.db_path(), "");
733 }
734
735 #[tokio::test]
736 async fn test_file_based_storage_with_directory_creation() {
737 use std::fs;
738
739 let temp_dir = std::env::temp_dir().join("reasonkit_test_telemetry");
741 let db_path = temp_dir.join("nested").join("dir").join("test.db");
742
743 if temp_dir.exists() {
745 fs::remove_dir_all(&temp_dir).ok();
746 }
747
748 let storage = TelemetryStorage::new(&db_path).await.unwrap();
750
751 assert!(db_path.parent().unwrap().exists());
753
754 let version = storage.schema_version().unwrap();
756 assert_eq!(version, TELEMETRY_SCHEMA_VERSION);
757
758 drop(storage);
760 fs::remove_dir_all(&temp_dir).ok();
761 }
762
763 #[tokio::test]
764 async fn test_schema_migration_idempotent() {
765 use std::fs;
767
768 let temp_dir = std::env::temp_dir().join("reasonkit_test_migration");
769 let db_path = temp_dir.join("migration_test.db");
770
771 if temp_dir.exists() {
772 fs::remove_dir_all(&temp_dir).ok();
773 }
774
775 {
777 let storage = TelemetryStorage::new(&db_path).await.unwrap();
778 assert_eq!(storage.schema_version().unwrap(), TELEMETRY_SCHEMA_VERSION);
779 }
780
781 {
783 let storage = TelemetryStorage::new(&db_path).await.unwrap();
784 assert_eq!(storage.schema_version().unwrap(), TELEMETRY_SCHEMA_VERSION);
785 }
786
787 fs::remove_dir_all(&temp_dir).ok();
789 }
790}