1use crate::error::{AnalyticsError, Result};
4use crate::models::{
5 AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6 EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7 PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
16pub struct AnalyticsDatabase {
17 pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21 pub async fn new(database_path: &Path) -> Result<Self> {
26 let db_url = if database_path.to_str() == Some(":memory:") {
27 "sqlite::memory:".to_string()
28 } else {
29 format!("sqlite://{}", database_path.display())
30 };
31
32 info!("Connecting to analytics database: {}", db_url);
33
34 let pool =
35 SqlitePoolOptions::new()
36 .max_connections(10)
37 .connect(&db_url)
38 .await
39 .map_err(|e| {
40 error!("Failed to connect to analytics database: {}", e);
41 AnalyticsError::Database(e)
42 })?;
43
44 sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
46
47 sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
49
50 Ok(Self { pool })
51 }
52
53 pub async fn run_migrations(&self) -> Result<()> {
55 info!("Running analytics database migrations");
56
57 let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
59 let mut conn = self.pool.acquire().await?;
60 let mut stream = conn.execute_many(migration_sql);
61
62 while let Some(_) = stream.try_next().await.map_err(|e| {
63 error!("Migration error: {}", e);
64 AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
65 })? {}
66
67 let coverage_migration_sql = include_str!("../migrations/002_coverage_metrics.sql");
69 let mut conn = self.pool.acquire().await?;
70 let mut stream = conn.execute_many(coverage_migration_sql);
71
72 while let Some(_) = stream.try_next().await.map_err(|e| {
73 error!("Coverage metrics migration error: {}", e);
74 AnalyticsError::Migration(format!("Failed to execute coverage metrics migration: {e}"))
75 })? {}
76
77 let pillar_usage_migration_sql = include_str!("../migrations/002_pillar_usage.sql");
79 let mut conn = self.pool.acquire().await?;
80 let mut stream = conn.execute_many(pillar_usage_migration_sql);
81
82 while let Some(_) = stream.try_next().await.map_err(|e| {
83 error!("Pillar usage migration error: {}", e);
84 AnalyticsError::Migration(format!("Failed to execute pillar usage migration: {e}"))
85 })? {}
86
87 info!("Analytics database migrations completed successfully");
88 Ok(())
89 }
90
91 #[must_use]
93 pub const fn pool(&self) -> &SqlitePool {
94 &self.pool
95 }
96
97 pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
103 let result = sqlx::query(
104 "INSERT INTO metrics_aggregates_minute (
105 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
106 request_count, error_count, latency_sum, latency_min, latency_max,
107 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
108 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
109 )
110 .bind(agg.timestamp)
111 .bind(&agg.protocol)
112 .bind(&agg.method)
113 .bind(&agg.endpoint)
114 .bind(agg.status_code)
115 .bind(&agg.workspace_id)
116 .bind(&agg.environment)
117 .bind(agg.request_count)
118 .bind(agg.error_count)
119 .bind(agg.latency_sum)
120 .bind(agg.latency_min)
121 .bind(agg.latency_max)
122 .bind(agg.latency_p50)
123 .bind(agg.latency_p95)
124 .bind(agg.latency_p99)
125 .bind(agg.bytes_sent)
126 .bind(agg.bytes_received)
127 .bind(agg.active_connections)
128 .execute(&self.pool)
129 .await?;
130
131 Ok(result.last_insert_rowid())
132 }
133
134 pub async fn insert_minute_aggregates_batch(
136 &self,
137 aggregates: &[MetricsAggregate],
138 ) -> Result<()> {
139 if aggregates.is_empty() {
140 return Ok(());
141 }
142
143 let mut tx = self.pool.begin().await?;
144
145 for agg in aggregates {
146 sqlx::query(
147 r"
148 INSERT INTO metrics_aggregates_minute (
149 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
150 request_count, error_count, latency_sum, latency_min, latency_max,
151 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
152 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
153 ",
154 )
155 .bind(agg.timestamp)
156 .bind(&agg.protocol)
157 .bind(&agg.method)
158 .bind(&agg.endpoint)
159 .bind(agg.status_code)
160 .bind(&agg.workspace_id)
161 .bind(&agg.environment)
162 .bind(agg.request_count)
163 .bind(agg.error_count)
164 .bind(agg.latency_sum)
165 .bind(agg.latency_min)
166 .bind(agg.latency_max)
167 .bind(agg.latency_p50)
168 .bind(agg.latency_p95)
169 .bind(agg.latency_p99)
170 .bind(agg.bytes_sent)
171 .bind(agg.bytes_received)
172 .bind(agg.active_connections)
173 .execute(&mut *tx)
174 .await?;
175 }
176
177 tx.commit().await?;
178 debug!("Inserted {} minute aggregates", aggregates.len());
179 Ok(())
180 }
181
182 pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
184 let result = sqlx::query(
185 r"
186 INSERT INTO metrics_aggregates_hour (
187 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
188 request_count, error_count, latency_sum, latency_min, latency_max,
189 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
190 active_connections_avg, active_connections_max
191 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
192 ",
193 )
194 .bind(agg.timestamp)
195 .bind(&agg.protocol)
196 .bind(&agg.method)
197 .bind(&agg.endpoint)
198 .bind(agg.status_code)
199 .bind(&agg.workspace_id)
200 .bind(&agg.environment)
201 .bind(agg.request_count)
202 .bind(agg.error_count)
203 .bind(agg.latency_sum)
204 .bind(agg.latency_min)
205 .bind(agg.latency_max)
206 .bind(agg.latency_p50)
207 .bind(agg.latency_p95)
208 .bind(agg.latency_p99)
209 .bind(agg.bytes_sent)
210 .bind(agg.bytes_received)
211 .bind(agg.active_connections_avg)
212 .bind(agg.active_connections_max)
213 .execute(&self.pool)
214 .await?;
215
216 Ok(result.last_insert_rowid())
217 }
218
219 pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
221 let result = sqlx::query(
222 r"
223 INSERT INTO metrics_aggregates_day (
224 date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
225 request_count, error_count, latency_sum, latency_min, latency_max,
226 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
227 active_connections_avg, active_connections_max, unique_clients, peak_hour
228 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
229 ",
230 )
231 .bind(&agg.date)
232 .bind(agg.timestamp)
233 .bind(&agg.protocol)
234 .bind(&agg.method)
235 .bind(&agg.endpoint)
236 .bind(agg.status_code)
237 .bind(&agg.workspace_id)
238 .bind(&agg.environment)
239 .bind(agg.request_count)
240 .bind(agg.error_count)
241 .bind(agg.latency_sum)
242 .bind(agg.latency_min)
243 .bind(agg.latency_max)
244 .bind(agg.latency_p50)
245 .bind(agg.latency_p95)
246 .bind(agg.latency_p99)
247 .bind(agg.bytes_sent)
248 .bind(agg.bytes_received)
249 .bind(agg.active_connections_avg)
250 .bind(agg.active_connections_max)
251 .bind(agg.unique_clients)
252 .bind(agg.peak_hour)
253 .execute(&self.pool)
254 .await?;
255
256 Ok(result.last_insert_rowid())
257 }
258
259 pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
261 sqlx::query(
262 r"
263 INSERT INTO endpoint_stats (
264 endpoint, protocol, method, workspace_id, environment,
265 total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
266 p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
267 first_seen, last_seen
268 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
269 ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
270 DO UPDATE SET
271 total_requests = total_requests + excluded.total_requests,
272 total_errors = total_errors + excluded.total_errors,
273 avg_latency_ms = excluded.avg_latency_ms,
274 min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
275 max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
276 p95_latency_ms = excluded.p95_latency_ms,
277 status_codes = excluded.status_codes,
278 total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
279 total_bytes_received = total_bytes_received + excluded.total_bytes_received,
280 last_seen = excluded.last_seen,
281 updated_at = strftime('%s', 'now')
282 ",
283 )
284 .bind(&stats.endpoint)
285 .bind(&stats.protocol)
286 .bind(&stats.method)
287 .bind(&stats.workspace_id)
288 .bind(&stats.environment)
289 .bind(stats.total_requests)
290 .bind(stats.total_errors)
291 .bind(stats.avg_latency_ms)
292 .bind(stats.min_latency_ms)
293 .bind(stats.max_latency_ms)
294 .bind(stats.p95_latency_ms)
295 .bind(&stats.status_codes)
296 .bind(stats.total_bytes_sent)
297 .bind(stats.total_bytes_received)
298 .bind(stats.first_seen)
299 .bind(stats.last_seen)
300 .execute(&self.pool)
301 .await?;
302
303 Ok(())
304 }
305
306 pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
308 let result = sqlx::query(
309 r"
310 INSERT INTO error_events (
311 timestamp, protocol, method, endpoint, status_code,
312 error_type, error_message, error_category,
313 request_id, trace_id, span_id,
314 client_ip, user_agent, workspace_id, environment, metadata
315 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
316 ",
317 )
318 .bind(error.timestamp)
319 .bind(&error.protocol)
320 .bind(&error.method)
321 .bind(&error.endpoint)
322 .bind(error.status_code)
323 .bind(&error.error_type)
324 .bind(&error.error_message)
325 .bind(&error.error_category)
326 .bind(&error.request_id)
327 .bind(&error.trace_id)
328 .bind(&error.span_id)
329 .bind(&error.client_ip)
330 .bind(&error.user_agent)
331 .bind(&error.workspace_id)
332 .bind(&error.environment)
333 .bind(&error.metadata)
334 .execute(&self.pool)
335 .await?;
336
337 Ok(result.last_insert_rowid())
338 }
339
340 pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
342 sqlx::query(
343 r"
344 INSERT INTO traffic_patterns (
345 date, hour, day_of_week, protocol, workspace_id, environment,
346 request_count, error_count, avg_latency_ms, unique_clients
347 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
348 ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
349 DO UPDATE SET
350 request_count = request_count + excluded.request_count,
351 error_count = error_count + excluded.error_count,
352 avg_latency_ms = excluded.avg_latency_ms,
353 unique_clients = excluded.unique_clients
354 ",
355 )
356 .bind(&pattern.date)
357 .bind(pattern.hour)
358 .bind(pattern.day_of_week)
359 .bind(&pattern.protocol)
360 .bind(&pattern.workspace_id)
361 .bind(&pattern.environment)
362 .bind(pattern.request_count)
363 .bind(pattern.error_count)
364 .bind(pattern.avg_latency_ms)
365 .bind(pattern.unique_clients)
366 .execute(&self.pool)
367 .await?;
368
369 Ok(())
370 }
371
372 pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
374 let result = sqlx::query(
375 r"
376 INSERT INTO analytics_snapshots (
377 timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
378 active_connections, protocol_stats, top_endpoints,
379 memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
380 workspace_id, environment
381 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
382 ",
383 )
384 .bind(snapshot.timestamp)
385 .bind(&snapshot.snapshot_type)
386 .bind(snapshot.total_requests)
387 .bind(snapshot.total_errors)
388 .bind(snapshot.avg_latency_ms)
389 .bind(snapshot.active_connections)
390 .bind(&snapshot.protocol_stats)
391 .bind(&snapshot.top_endpoints)
392 .bind(snapshot.memory_usage_bytes)
393 .bind(snapshot.cpu_usage_percent)
394 .bind(snapshot.thread_count)
395 .bind(snapshot.uptime_seconds)
396 .bind(&snapshot.workspace_id)
397 .bind(&snapshot.environment)
398 .execute(&self.pool)
399 .await?;
400
401 Ok(result.last_insert_rowid())
402 }
403
404 pub async fn get_minute_aggregates(
410 &self,
411 filter: &AnalyticsFilter,
412 ) -> Result<Vec<MetricsAggregate>> {
413 let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
414
415 if filter.start_time.is_some() {
416 query.push_str(" AND timestamp >= ?");
417 }
418 if filter.end_time.is_some() {
419 query.push_str(" AND timestamp <= ?");
420 }
421 if filter.protocol.is_some() {
422 query.push_str(" AND protocol = ?");
423 }
424 if filter.endpoint.is_some() {
425 query.push_str(" AND endpoint = ?");
426 }
427 if filter.method.is_some() {
428 query.push_str(" AND method = ?");
429 }
430 if filter.status_code.is_some() {
431 query.push_str(" AND status_code = ?");
432 }
433 if filter.workspace_id.is_some() {
434 query.push_str(" AND workspace_id = ?");
435 }
436 if filter.environment.is_some() {
437 query.push_str(" AND environment = ?");
438 }
439
440 query.push_str(" ORDER BY timestamp DESC");
441
442 if filter.limit.is_some() {
443 query.push_str(" LIMIT ?");
444 }
445
446 let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
448
449 if let Some(start) = filter.start_time {
450 sql_query = sql_query.bind(start);
451 }
452 if let Some(end) = filter.end_time {
453 sql_query = sql_query.bind(end);
454 }
455 if let Some(ref protocol) = filter.protocol {
456 sql_query = sql_query.bind(protocol);
457 }
458 if let Some(ref endpoint) = filter.endpoint {
459 sql_query = sql_query.bind(endpoint);
460 }
461 if let Some(ref method) = filter.method {
462 sql_query = sql_query.bind(method);
463 }
464 if let Some(status) = filter.status_code {
465 sql_query = sql_query.bind(status);
466 }
467 if let Some(ref workspace) = filter.workspace_id {
468 sql_query = sql_query.bind(workspace);
469 }
470 if let Some(ref env) = filter.environment {
471 sql_query = sql_query.bind(env);
472 }
473 if let Some(limit) = filter.limit {
474 sql_query = sql_query.bind(limit);
475 }
476
477 let results = sql_query.fetch_all(&self.pool).await?;
478
479 Ok(results)
480 }
481
482 pub async fn get_hour_aggregates(
484 &self,
485 filter: &AnalyticsFilter,
486 ) -> Result<Vec<HourMetricsAggregate>> {
487 let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
488
489 if filter.start_time.is_some() {
490 query.push_str(" AND timestamp >= ?");
491 }
492 if filter.end_time.is_some() {
493 query.push_str(" AND timestamp <= ?");
494 }
495 if filter.protocol.is_some() {
496 query.push_str(" AND protocol = ?");
497 }
498 if filter.endpoint.is_some() {
499 query.push_str(" AND endpoint = ?");
500 }
501 if filter.method.is_some() {
502 query.push_str(" AND method = ?");
503 }
504 if filter.status_code.is_some() {
505 query.push_str(" AND status_code = ?");
506 }
507 if filter.workspace_id.is_some() {
508 query.push_str(" AND workspace_id = ?");
509 }
510 if filter.environment.is_some() {
511 query.push_str(" AND environment = ?");
512 }
513
514 query.push_str(" ORDER BY timestamp DESC");
515
516 if filter.limit.is_some() {
517 query.push_str(" LIMIT ?");
518 }
519
520 let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
522
523 if let Some(start) = filter.start_time {
524 sql_query = sql_query.bind(start);
525 }
526 if let Some(end) = filter.end_time {
527 sql_query = sql_query.bind(end);
528 }
529 if let Some(ref protocol) = filter.protocol {
530 sql_query = sql_query.bind(protocol);
531 }
532 if let Some(ref endpoint) = filter.endpoint {
533 sql_query = sql_query.bind(endpoint);
534 }
535 if let Some(ref method) = filter.method {
536 sql_query = sql_query.bind(method);
537 }
538 if let Some(status) = filter.status_code {
539 sql_query = sql_query.bind(status);
540 }
541 if let Some(ref workspace) = filter.workspace_id {
542 sql_query = sql_query.bind(workspace);
543 }
544 if let Some(ref env) = filter.environment {
545 sql_query = sql_query.bind(env);
546 }
547 if let Some(limit) = filter.limit {
548 sql_query = sql_query.bind(limit);
549 }
550
551 let results = sql_query.fetch_all(&self.pool).await?;
552
553 Ok(results)
554 }
555
556 pub async fn get_top_endpoints(
558 &self,
559 limit: i64,
560 workspace_id: Option<&str>,
561 ) -> Result<Vec<EndpointStats>> {
562 let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
563
564 if workspace_id.is_some() {
565 query.push_str(" AND workspace_id = ?");
566 }
567
568 query.push_str(" ORDER BY total_requests DESC LIMIT ?");
569
570 let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
571
572 if let Some(workspace) = workspace_id {
573 sql_query = sql_query.bind(workspace);
574 }
575
576 sql_query = sql_query.bind(limit);
577
578 let results = sql_query.fetch_all(&self.pool).await?;
579
580 Ok(results)
581 }
582
583 pub async fn get_recent_errors(
585 &self,
586 limit: i64,
587 filter: &AnalyticsFilter,
588 ) -> Result<Vec<ErrorEvent>> {
589 let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
590
591 if filter.start_time.is_some() {
592 query.push_str(" AND timestamp >= ?");
593 }
594 if filter.end_time.is_some() {
595 query.push_str(" AND timestamp <= ?");
596 }
597 if filter.endpoint.is_some() {
598 query.push_str(" AND endpoint = ?");
599 }
600 if filter.workspace_id.is_some() {
601 query.push_str(" AND workspace_id = ?");
602 }
603
604 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
605
606 let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
607
608 if let Some(start) = filter.start_time {
609 sql_query = sql_query.bind(start);
610 }
611 if let Some(end) = filter.end_time {
612 sql_query = sql_query.bind(end);
613 }
614 if let Some(ref endpoint) = filter.endpoint {
615 sql_query = sql_query.bind(endpoint);
616 }
617 if let Some(ref workspace) = filter.workspace_id {
618 sql_query = sql_query.bind(workspace);
619 }
620
621 sql_query = sql_query.bind(limit);
622
623 let results = sql_query.fetch_all(&self.pool).await?;
624
625 Ok(results)
626 }
627
628 pub async fn get_traffic_patterns(
630 &self,
631 days: i64,
632 workspace_id: Option<&str>,
633 ) -> Result<Vec<TrafficPattern>> {
634 let start_date = chrono::Utc::now() - chrono::Duration::days(days);
635 let start_date_str = start_date.format("%Y-%m-%d").to_string();
636
637 let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
638
639 if let Some(_workspace) = workspace_id {
640 query.push_str(" AND workspace_id = ?");
641 }
642
643 query.push_str(" ORDER BY date ASC, hour ASC");
644
645 let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
646
647 if let Some(workspace) = workspace_id {
648 query_builder = query_builder.bind(workspace);
649 }
650
651 let results = query_builder.fetch_all(&self.pool).await?;
652
653 Ok(results)
654 }
655
656 pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
662 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
663
664 let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
665 .bind(cutoff)
666 .execute(&self.pool)
667 .await?;
668
669 info!(
670 "Cleaned up {} minute aggregates older than {} days",
671 result.rows_affected(),
672 days
673 );
674 Ok(result.rows_affected())
675 }
676
677 pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
679 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
680
681 let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
682 .bind(cutoff)
683 .execute(&self.pool)
684 .await?;
685
686 info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
687 Ok(result.rows_affected())
688 }
689
690 pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
692 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
693
694 let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
695 .bind(cutoff)
696 .execute(&self.pool)
697 .await?;
698
699 info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
700 Ok(result.rows_affected())
701 }
702
703 pub async fn vacuum(&self) -> Result<()> {
705 info!("Running VACUUM on analytics database");
706 sqlx::query("VACUUM").execute(&self.pool).await?;
707 info!("VACUUM completed");
708 Ok(())
709 }
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715
716 #[tokio::test]
717 async fn test_database_creation() {
718 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
719 db.run_migrations().await.unwrap();
720 }
721
722 #[tokio::test]
723 async fn test_insert_minute_aggregate() {
724 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
725 db.run_migrations().await.unwrap();
726
727 let agg = MetricsAggregate {
728 id: None,
729 timestamp: chrono::Utc::now().timestamp(),
730 protocol: "HTTP".to_string(),
731 method: Some("GET".to_string()),
732 endpoint: Some("/api/test".to_string()),
733 status_code: Some(200),
734 workspace_id: None,
735 environment: None,
736 request_count: 100,
737 error_count: 5,
738 latency_sum: 500.0,
739 latency_min: Some(10.0),
740 latency_max: Some(100.0),
741 latency_p50: Some(45.0),
742 latency_p95: Some(95.0),
743 latency_p99: Some(99.0),
744 bytes_sent: 10000,
745 bytes_received: 5000,
746 active_connections: Some(10),
747 created_at: None,
748 };
749
750 let id = db.insert_minute_aggregate(&agg).await.unwrap();
751 assert!(id > 0);
752 }
753}
754
755impl AnalyticsDatabase {
760 pub async fn record_scenario_usage(
762 &self,
763 scenario_id: &str,
764 workspace_id: Option<&str>,
765 org_id: Option<&str>,
766 ) -> Result<()> {
767 let now = chrono::Utc::now().timestamp();
768
769 let rows_affected = sqlx::query(
772 "UPDATE scenario_usage_metrics
773 SET usage_count = usage_count + 1,
774 last_used_at = ?,
775 updated_at = ?
776 WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
777 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
778 )
779 .bind(now)
780 .bind(now)
781 .bind(scenario_id)
782 .bind(workspace_id)
783 .bind(workspace_id)
784 .bind(org_id)
785 .bind(org_id)
786 .execute(&self.pool)
787 .await?;
788
789 if rows_affected.rows_affected() == 0 {
791 sqlx::query(
792 "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
793 VALUES (?, ?, ?, 1, ?, ?, ?)"
794 )
795 .bind(scenario_id)
796 .bind(workspace_id)
797 .bind(org_id)
798 .bind(now)
799 .bind(now)
800 .bind(now)
801 .execute(&self.pool)
802 .await?;
803 }
804
805 Ok(())
806 }
807
808 pub async fn record_persona_ci_hit(
810 &self,
811 persona_id: &str,
812 workspace_id: Option<&str>,
813 org_id: Option<&str>,
814 ci_run_id: Option<&str>,
815 ) -> Result<()> {
816 let now = chrono::Utc::now().timestamp();
817
818 sqlx::query(
819 "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
820 VALUES (?, ?, ?, ?, 1, ?)"
821 )
822 .bind(persona_id)
823 .bind(workspace_id)
824 .bind(org_id)
825 .bind(ci_run_id)
826 .bind(now)
827 .execute(&self.pool)
828 .await?;
829
830 Ok(())
831 }
832
833 pub async fn record_endpoint_coverage(
835 &self,
836 endpoint: &str,
837 method: Option<&str>,
838 protocol: &str,
839 workspace_id: Option<&str>,
840 org_id: Option<&str>,
841 coverage_percentage: Option<f64>,
842 ) -> Result<()> {
843 let now = chrono::Utc::now().timestamp();
844
845 let rows_affected = sqlx::query(
847 "UPDATE endpoint_coverage
848 SET test_count = test_count + 1,
849 last_tested_at = ?,
850 coverage_percentage = COALESCE(?, coverage_percentage),
851 updated_at = ?
852 WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
853 AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
854 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
855 )
856 .bind(now)
857 .bind(coverage_percentage)
858 .bind(now)
859 .bind(endpoint)
860 .bind(method)
861 .bind(method)
862 .bind(protocol)
863 .bind(workspace_id)
864 .bind(workspace_id)
865 .bind(org_id)
866 .bind(org_id)
867 .execute(&self.pool)
868 .await?;
869
870 if rows_affected.rows_affected() == 0 {
872 sqlx::query(
873 "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
874 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
875 )
876 .bind(endpoint)
877 .bind(method)
878 .bind(protocol)
879 .bind(workspace_id)
880 .bind(org_id)
881 .bind(now)
882 .bind(coverage_percentage)
883 .bind(now)
884 .bind(now)
885 .execute(&self.pool)
886 .await?;
887 }
888
889 Ok(())
890 }
891
892 pub async fn record_reality_level_staleness(
894 &self,
895 workspace_id: &str,
896 org_id: Option<&str>,
897 endpoint: Option<&str>,
898 method: Option<&str>,
899 protocol: Option<&str>,
900 current_reality_level: Option<&str>,
901 staleness_days: Option<i32>,
902 ) -> Result<()> {
903 let now = chrono::Utc::now().timestamp();
904 let last_updated = if let Some(days) = staleness_days {
905 Some(now - (i64::from(days) * 86400))
906 } else {
907 Some(now)
908 };
909
910 sqlx::query(
911 "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
912 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
913 ON CONFLICT DO NOTHING"
914 )
915 .bind(workspace_id)
916 .bind(org_id)
917 .bind(endpoint)
918 .bind(method)
919 .bind(protocol)
920 .bind(current_reality_level)
921 .bind(last_updated)
922 .bind(staleness_days)
923 .bind(now)
924 .bind(now)
925 .execute(&self.pool)
926 .await?;
927
928 Ok(())
929 }
930
931 pub async fn record_drift_percentage(
933 &self,
934 workspace_id: &str,
935 org_id: Option<&str>,
936 total_mocks: i64,
937 drifting_mocks: i64,
938 ) -> Result<()> {
939 let now = chrono::Utc::now().timestamp();
940 let drift_percentage = if total_mocks > 0 {
941 (drifting_mocks as f64 / total_mocks as f64) * 100.0
942 } else {
943 0.0
944 };
945
946 sqlx::query(
947 "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
948 VALUES (?, ?, ?, ?, ?, ?)"
949 )
950 .bind(workspace_id)
951 .bind(org_id)
952 .bind(total_mocks)
953 .bind(drifting_mocks)
954 .bind(drift_percentage)
955 .bind(now)
956 .execute(&self.pool)
957 .await?;
958
959 Ok(())
960 }
961
962 pub async fn get_scenario_usage(
964 &self,
965 workspace_id: Option<&str>,
966 org_id: Option<&str>,
967 limit: Option<i64>,
968 ) -> Result<Vec<ScenarioUsageMetrics>> {
969 let limit = limit.unwrap_or(100);
970 let mut query = String::from(
971 "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
972 FROM scenario_usage_metrics
973 WHERE 1=1"
974 );
975
976 if workspace_id.is_some() {
977 query.push_str(" AND workspace_id = ?");
978 }
979 if org_id.is_some() {
980 query.push_str(" AND org_id = ?");
981 }
982 query.push_str(" ORDER BY usage_count DESC LIMIT ?");
983
984 let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
985 if let Some(ws_id) = workspace_id {
986 q = q.bind(ws_id);
987 }
988 if let Some(o_id) = org_id {
989 q = q.bind(o_id);
990 }
991 q = q.bind(limit);
992
993 let results = q.fetch_all(&self.pool).await?;
994 Ok(results)
995 }
996
997 pub async fn get_persona_ci_hits(
999 &self,
1000 workspace_id: Option<&str>,
1001 org_id: Option<&str>,
1002 limit: Option<i64>,
1003 ) -> Result<Vec<PersonaCIHit>> {
1004 let limit = limit.unwrap_or(100);
1005 let mut query = String::from(
1006 "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1007 FROM persona_ci_hits
1008 WHERE 1=1",
1009 );
1010
1011 if workspace_id.is_some() {
1012 query.push_str(" AND workspace_id = ?");
1013 }
1014 if org_id.is_some() {
1015 query.push_str(" AND org_id = ?");
1016 }
1017 query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1018
1019 let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1020 if let Some(ws_id) = workspace_id {
1021 q = q.bind(ws_id);
1022 }
1023 if let Some(o_id) = org_id {
1024 q = q.bind(o_id);
1025 }
1026 q = q.bind(limit);
1027
1028 let results = q.fetch_all(&self.pool).await?;
1029 Ok(results)
1030 }
1031
1032 pub async fn get_endpoint_coverage(
1034 &self,
1035 workspace_id: Option<&str>,
1036 org_id: Option<&str>,
1037 min_coverage: Option<f64>,
1038 ) -> Result<Vec<EndpointCoverage>> {
1039 let mut query = String::from(
1040 "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1041 FROM endpoint_coverage
1042 WHERE 1=1"
1043 );
1044
1045 if workspace_id.is_some() {
1046 query.push_str(" AND workspace_id = ?");
1047 }
1048 if org_id.is_some() {
1049 query.push_str(" AND org_id = ?");
1050 }
1051 if min_coverage.is_some() {
1052 query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1053 }
1054 query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1055
1056 let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1057 if let Some(ws_id) = workspace_id {
1058 q = q.bind(ws_id);
1059 }
1060 if let Some(o_id) = org_id {
1061 q = q.bind(o_id);
1062 }
1063 if let Some(min_cov) = min_coverage {
1064 q = q.bind(min_cov);
1065 }
1066
1067 let results = q.fetch_all(&self.pool).await?;
1068 Ok(results)
1069 }
1070
1071 pub async fn get_reality_level_staleness(
1073 &self,
1074 workspace_id: Option<&str>,
1075 org_id: Option<&str>,
1076 max_staleness_days: Option<i32>,
1077 ) -> Result<Vec<RealityLevelStaleness>> {
1078 let mut query = String::from(
1079 "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1080 FROM reality_level_staleness
1081 WHERE 1=1"
1082 );
1083
1084 if workspace_id.is_some() {
1085 query.push_str(" AND workspace_id = ?");
1086 }
1087 if org_id.is_some() {
1088 query.push_str(" AND org_id = ?");
1089 }
1090 if max_staleness_days.is_some() {
1091 query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1092 }
1093 query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1094
1095 let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1096 if let Some(ws_id) = workspace_id {
1097 q = q.bind(ws_id);
1098 }
1099 if let Some(o_id) = org_id {
1100 q = q.bind(o_id);
1101 }
1102 if let Some(max_days) = max_staleness_days {
1103 q = q.bind(max_days);
1104 }
1105
1106 let results = q.fetch_all(&self.pool).await?;
1107 Ok(results)
1108 }
1109
1110 pub async fn get_drift_percentage(
1112 &self,
1113 workspace_id: Option<&str>,
1114 org_id: Option<&str>,
1115 limit: Option<i64>,
1116 ) -> Result<Vec<DriftPercentageMetrics>> {
1117 let limit = limit.unwrap_or(100);
1118 let mut query = String::from(
1119 "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1120 FROM drift_percentage_metrics
1121 WHERE 1=1"
1122 );
1123
1124 if workspace_id.is_some() {
1125 query.push_str(" AND workspace_id = ?");
1126 }
1127 if org_id.is_some() {
1128 query.push_str(" AND org_id = ?");
1129 }
1130 query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1131
1132 let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1133 if let Some(ws_id) = workspace_id {
1134 q = q.bind(ws_id);
1135 }
1136 if let Some(o_id) = org_id {
1137 q = q.bind(o_id);
1138 }
1139 q = q.bind(limit);
1140
1141 let results = q.fetch_all(&self.pool).await?;
1142 Ok(results)
1143 }
1144}