1use crate::error::{AnalyticsError, Result};
4use crate::models::{
5 AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6 EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7 PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
16pub struct AnalyticsDatabase {
17 pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21 pub async fn new(database_path: &Path) -> Result<Self> {
26 let db_url = if database_path.to_str() == Some(":memory:") {
27 "sqlite::memory:".to_string()
28 } else {
29 format!("sqlite://{}", database_path.display())
30 };
31
32 info!("Connecting to analytics database: {}", db_url);
33
34 let pool =
35 SqlitePoolOptions::new()
36 .max_connections(10)
37 .connect(&db_url)
38 .await
39 .map_err(|e| {
40 error!("Failed to connect to analytics database: {}", e);
41 AnalyticsError::Database(e)
42 })?;
43
44 sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
46
47 sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
49
50 Ok(Self { pool })
51 }
52
53 pub async fn run_migrations(&self) -> Result<()> {
55 info!("Running analytics database migrations");
56
57 let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
59 let mut conn = self.pool.acquire().await?;
60 let mut stream = conn.execute_many(migration_sql);
61
62 while let Some(_) = stream.try_next().await.map_err(|e| {
63 error!("Migration error: {}", e);
64 AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
65 })? {}
66
67 let coverage_migration_sql = include_str!("../migrations/002_coverage_metrics.sql");
69 let mut conn = self.pool.acquire().await?;
70 let mut stream = conn.execute_many(coverage_migration_sql);
71
72 while let Some(_) = stream.try_next().await.map_err(|e| {
73 error!("Coverage metrics migration error: {}", e);
74 AnalyticsError::Migration(format!("Failed to execute coverage metrics migration: {e}"))
75 })? {}
76
77 info!("Analytics database migrations completed successfully");
78 Ok(())
79 }
80
81 #[must_use]
83 pub const fn pool(&self) -> &SqlitePool {
84 &self.pool
85 }
86
87 pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
93 let result = sqlx::query(
94 "INSERT INTO metrics_aggregates_minute (
95 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
96 request_count, error_count, latency_sum, latency_min, latency_max,
97 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
98 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
99 )
100 .bind(agg.timestamp)
101 .bind(&agg.protocol)
102 .bind(&agg.method)
103 .bind(&agg.endpoint)
104 .bind(agg.status_code)
105 .bind(&agg.workspace_id)
106 .bind(&agg.environment)
107 .bind(agg.request_count)
108 .bind(agg.error_count)
109 .bind(agg.latency_sum)
110 .bind(agg.latency_min)
111 .bind(agg.latency_max)
112 .bind(agg.latency_p50)
113 .bind(agg.latency_p95)
114 .bind(agg.latency_p99)
115 .bind(agg.bytes_sent)
116 .bind(agg.bytes_received)
117 .bind(agg.active_connections)
118 .execute(&self.pool)
119 .await?;
120
121 Ok(result.last_insert_rowid())
122 }
123
124 pub async fn insert_minute_aggregates_batch(
126 &self,
127 aggregates: &[MetricsAggregate],
128 ) -> Result<()> {
129 if aggregates.is_empty() {
130 return Ok(());
131 }
132
133 let mut tx = self.pool.begin().await?;
134
135 for agg in aggregates {
136 sqlx::query(
137 r"
138 INSERT INTO metrics_aggregates_minute (
139 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
140 request_count, error_count, latency_sum, latency_min, latency_max,
141 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
142 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
143 ",
144 )
145 .bind(agg.timestamp)
146 .bind(&agg.protocol)
147 .bind(&agg.method)
148 .bind(&agg.endpoint)
149 .bind(agg.status_code)
150 .bind(&agg.workspace_id)
151 .bind(&agg.environment)
152 .bind(agg.request_count)
153 .bind(agg.error_count)
154 .bind(agg.latency_sum)
155 .bind(agg.latency_min)
156 .bind(agg.latency_max)
157 .bind(agg.latency_p50)
158 .bind(agg.latency_p95)
159 .bind(agg.latency_p99)
160 .bind(agg.bytes_sent)
161 .bind(agg.bytes_received)
162 .bind(agg.active_connections)
163 .execute(&mut *tx)
164 .await?;
165 }
166
167 tx.commit().await?;
168 debug!("Inserted {} minute aggregates", aggregates.len());
169 Ok(())
170 }
171
172 pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
174 let result = sqlx::query(
175 r"
176 INSERT INTO metrics_aggregates_hour (
177 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
178 request_count, error_count, latency_sum, latency_min, latency_max,
179 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
180 active_connections_avg, active_connections_max
181 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
182 ",
183 )
184 .bind(agg.timestamp)
185 .bind(&agg.protocol)
186 .bind(&agg.method)
187 .bind(&agg.endpoint)
188 .bind(agg.status_code)
189 .bind(&agg.workspace_id)
190 .bind(&agg.environment)
191 .bind(agg.request_count)
192 .bind(agg.error_count)
193 .bind(agg.latency_sum)
194 .bind(agg.latency_min)
195 .bind(agg.latency_max)
196 .bind(agg.latency_p50)
197 .bind(agg.latency_p95)
198 .bind(agg.latency_p99)
199 .bind(agg.bytes_sent)
200 .bind(agg.bytes_received)
201 .bind(agg.active_connections_avg)
202 .bind(agg.active_connections_max)
203 .execute(&self.pool)
204 .await?;
205
206 Ok(result.last_insert_rowid())
207 }
208
209 pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
211 let result = sqlx::query(
212 r"
213 INSERT INTO metrics_aggregates_day (
214 date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
215 request_count, error_count, latency_sum, latency_min, latency_max,
216 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
217 active_connections_avg, active_connections_max, unique_clients, peak_hour
218 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
219 ",
220 )
221 .bind(&agg.date)
222 .bind(agg.timestamp)
223 .bind(&agg.protocol)
224 .bind(&agg.method)
225 .bind(&agg.endpoint)
226 .bind(agg.status_code)
227 .bind(&agg.workspace_id)
228 .bind(&agg.environment)
229 .bind(agg.request_count)
230 .bind(agg.error_count)
231 .bind(agg.latency_sum)
232 .bind(agg.latency_min)
233 .bind(agg.latency_max)
234 .bind(agg.latency_p50)
235 .bind(agg.latency_p95)
236 .bind(agg.latency_p99)
237 .bind(agg.bytes_sent)
238 .bind(agg.bytes_received)
239 .bind(agg.active_connections_avg)
240 .bind(agg.active_connections_max)
241 .bind(agg.unique_clients)
242 .bind(agg.peak_hour)
243 .execute(&self.pool)
244 .await?;
245
246 Ok(result.last_insert_rowid())
247 }
248
249 pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
251 sqlx::query(
252 r"
253 INSERT INTO endpoint_stats (
254 endpoint, protocol, method, workspace_id, environment,
255 total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
256 p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
257 first_seen, last_seen
258 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
259 ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
260 DO UPDATE SET
261 total_requests = total_requests + excluded.total_requests,
262 total_errors = total_errors + excluded.total_errors,
263 avg_latency_ms = excluded.avg_latency_ms,
264 min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
265 max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
266 p95_latency_ms = excluded.p95_latency_ms,
267 status_codes = excluded.status_codes,
268 total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
269 total_bytes_received = total_bytes_received + excluded.total_bytes_received,
270 last_seen = excluded.last_seen,
271 updated_at = strftime('%s', 'now')
272 ",
273 )
274 .bind(&stats.endpoint)
275 .bind(&stats.protocol)
276 .bind(&stats.method)
277 .bind(&stats.workspace_id)
278 .bind(&stats.environment)
279 .bind(stats.total_requests)
280 .bind(stats.total_errors)
281 .bind(stats.avg_latency_ms)
282 .bind(stats.min_latency_ms)
283 .bind(stats.max_latency_ms)
284 .bind(stats.p95_latency_ms)
285 .bind(&stats.status_codes)
286 .bind(stats.total_bytes_sent)
287 .bind(stats.total_bytes_received)
288 .bind(stats.first_seen)
289 .bind(stats.last_seen)
290 .execute(&self.pool)
291 .await?;
292
293 Ok(())
294 }
295
296 pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
298 let result = sqlx::query(
299 r"
300 INSERT INTO error_events (
301 timestamp, protocol, method, endpoint, status_code,
302 error_type, error_message, error_category,
303 request_id, trace_id, span_id,
304 client_ip, user_agent, workspace_id, environment, metadata
305 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
306 ",
307 )
308 .bind(error.timestamp)
309 .bind(&error.protocol)
310 .bind(&error.method)
311 .bind(&error.endpoint)
312 .bind(error.status_code)
313 .bind(&error.error_type)
314 .bind(&error.error_message)
315 .bind(&error.error_category)
316 .bind(&error.request_id)
317 .bind(&error.trace_id)
318 .bind(&error.span_id)
319 .bind(&error.client_ip)
320 .bind(&error.user_agent)
321 .bind(&error.workspace_id)
322 .bind(&error.environment)
323 .bind(&error.metadata)
324 .execute(&self.pool)
325 .await?;
326
327 Ok(result.last_insert_rowid())
328 }
329
330 pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
332 sqlx::query(
333 r"
334 INSERT INTO traffic_patterns (
335 date, hour, day_of_week, protocol, workspace_id, environment,
336 request_count, error_count, avg_latency_ms, unique_clients
337 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
338 ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
339 DO UPDATE SET
340 request_count = request_count + excluded.request_count,
341 error_count = error_count + excluded.error_count,
342 avg_latency_ms = excluded.avg_latency_ms,
343 unique_clients = excluded.unique_clients
344 ",
345 )
346 .bind(&pattern.date)
347 .bind(pattern.hour)
348 .bind(pattern.day_of_week)
349 .bind(&pattern.protocol)
350 .bind(&pattern.workspace_id)
351 .bind(&pattern.environment)
352 .bind(pattern.request_count)
353 .bind(pattern.error_count)
354 .bind(pattern.avg_latency_ms)
355 .bind(pattern.unique_clients)
356 .execute(&self.pool)
357 .await?;
358
359 Ok(())
360 }
361
362 pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
364 let result = sqlx::query(
365 r"
366 INSERT INTO analytics_snapshots (
367 timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
368 active_connections, protocol_stats, top_endpoints,
369 memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
370 workspace_id, environment
371 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
372 ",
373 )
374 .bind(snapshot.timestamp)
375 .bind(&snapshot.snapshot_type)
376 .bind(snapshot.total_requests)
377 .bind(snapshot.total_errors)
378 .bind(snapshot.avg_latency_ms)
379 .bind(snapshot.active_connections)
380 .bind(&snapshot.protocol_stats)
381 .bind(&snapshot.top_endpoints)
382 .bind(snapshot.memory_usage_bytes)
383 .bind(snapshot.cpu_usage_percent)
384 .bind(snapshot.thread_count)
385 .bind(snapshot.uptime_seconds)
386 .bind(&snapshot.workspace_id)
387 .bind(&snapshot.environment)
388 .execute(&self.pool)
389 .await?;
390
391 Ok(result.last_insert_rowid())
392 }
393
394 pub async fn get_minute_aggregates(
400 &self,
401 filter: &AnalyticsFilter,
402 ) -> Result<Vec<MetricsAggregate>> {
403 let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
404
405 if filter.start_time.is_some() {
406 query.push_str(" AND timestamp >= ?");
407 }
408 if filter.end_time.is_some() {
409 query.push_str(" AND timestamp <= ?");
410 }
411 if filter.protocol.is_some() {
412 query.push_str(" AND protocol = ?");
413 }
414 if filter.endpoint.is_some() {
415 query.push_str(" AND endpoint = ?");
416 }
417 if filter.method.is_some() {
418 query.push_str(" AND method = ?");
419 }
420 if filter.status_code.is_some() {
421 query.push_str(" AND status_code = ?");
422 }
423 if filter.workspace_id.is_some() {
424 query.push_str(" AND workspace_id = ?");
425 }
426 if filter.environment.is_some() {
427 query.push_str(" AND environment = ?");
428 }
429
430 query.push_str(" ORDER BY timestamp DESC");
431
432 if filter.limit.is_some() {
433 query.push_str(" LIMIT ?");
434 }
435
436 let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
438
439 if let Some(start) = filter.start_time {
440 sql_query = sql_query.bind(start);
441 }
442 if let Some(end) = filter.end_time {
443 sql_query = sql_query.bind(end);
444 }
445 if let Some(ref protocol) = filter.protocol {
446 sql_query = sql_query.bind(protocol);
447 }
448 if let Some(ref endpoint) = filter.endpoint {
449 sql_query = sql_query.bind(endpoint);
450 }
451 if let Some(ref method) = filter.method {
452 sql_query = sql_query.bind(method);
453 }
454 if let Some(status) = filter.status_code {
455 sql_query = sql_query.bind(status);
456 }
457 if let Some(ref workspace) = filter.workspace_id {
458 sql_query = sql_query.bind(workspace);
459 }
460 if let Some(ref env) = filter.environment {
461 sql_query = sql_query.bind(env);
462 }
463 if let Some(limit) = filter.limit {
464 sql_query = sql_query.bind(limit);
465 }
466
467 let results = sql_query.fetch_all(&self.pool).await?;
468
469 Ok(results)
470 }
471
472 pub async fn get_hour_aggregates(
474 &self,
475 filter: &AnalyticsFilter,
476 ) -> Result<Vec<HourMetricsAggregate>> {
477 let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
478
479 if filter.start_time.is_some() {
480 query.push_str(" AND timestamp >= ?");
481 }
482 if filter.end_time.is_some() {
483 query.push_str(" AND timestamp <= ?");
484 }
485 if filter.protocol.is_some() {
486 query.push_str(" AND protocol = ?");
487 }
488 if filter.endpoint.is_some() {
489 query.push_str(" AND endpoint = ?");
490 }
491 if filter.method.is_some() {
492 query.push_str(" AND method = ?");
493 }
494 if filter.status_code.is_some() {
495 query.push_str(" AND status_code = ?");
496 }
497 if filter.workspace_id.is_some() {
498 query.push_str(" AND workspace_id = ?");
499 }
500 if filter.environment.is_some() {
501 query.push_str(" AND environment = ?");
502 }
503
504 query.push_str(" ORDER BY timestamp DESC");
505
506 if filter.limit.is_some() {
507 query.push_str(" LIMIT ?");
508 }
509
510 let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
512
513 if let Some(start) = filter.start_time {
514 sql_query = sql_query.bind(start);
515 }
516 if let Some(end) = filter.end_time {
517 sql_query = sql_query.bind(end);
518 }
519 if let Some(ref protocol) = filter.protocol {
520 sql_query = sql_query.bind(protocol);
521 }
522 if let Some(ref endpoint) = filter.endpoint {
523 sql_query = sql_query.bind(endpoint);
524 }
525 if let Some(ref method) = filter.method {
526 sql_query = sql_query.bind(method);
527 }
528 if let Some(status) = filter.status_code {
529 sql_query = sql_query.bind(status);
530 }
531 if let Some(ref workspace) = filter.workspace_id {
532 sql_query = sql_query.bind(workspace);
533 }
534 if let Some(ref env) = filter.environment {
535 sql_query = sql_query.bind(env);
536 }
537 if let Some(limit) = filter.limit {
538 sql_query = sql_query.bind(limit);
539 }
540
541 let results = sql_query.fetch_all(&self.pool).await?;
542
543 Ok(results)
544 }
545
546 pub async fn get_top_endpoints(
548 &self,
549 limit: i64,
550 workspace_id: Option<&str>,
551 ) -> Result<Vec<EndpointStats>> {
552 let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
553
554 if workspace_id.is_some() {
555 query.push_str(" AND workspace_id = ?");
556 }
557
558 query.push_str(" ORDER BY total_requests DESC LIMIT ?");
559
560 let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
561
562 if let Some(workspace) = workspace_id {
563 sql_query = sql_query.bind(workspace);
564 }
565
566 sql_query = sql_query.bind(limit);
567
568 let results = sql_query.fetch_all(&self.pool).await?;
569
570 Ok(results)
571 }
572
573 pub async fn get_recent_errors(
575 &self,
576 limit: i64,
577 filter: &AnalyticsFilter,
578 ) -> Result<Vec<ErrorEvent>> {
579 let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
580
581 if filter.start_time.is_some() {
582 query.push_str(" AND timestamp >= ?");
583 }
584 if filter.end_time.is_some() {
585 query.push_str(" AND timestamp <= ?");
586 }
587 if filter.endpoint.is_some() {
588 query.push_str(" AND endpoint = ?");
589 }
590 if filter.workspace_id.is_some() {
591 query.push_str(" AND workspace_id = ?");
592 }
593
594 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
595
596 let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
597
598 if let Some(start) = filter.start_time {
599 sql_query = sql_query.bind(start);
600 }
601 if let Some(end) = filter.end_time {
602 sql_query = sql_query.bind(end);
603 }
604 if let Some(ref endpoint) = filter.endpoint {
605 sql_query = sql_query.bind(endpoint);
606 }
607 if let Some(ref workspace) = filter.workspace_id {
608 sql_query = sql_query.bind(workspace);
609 }
610
611 sql_query = sql_query.bind(limit);
612
613 let results = sql_query.fetch_all(&self.pool).await?;
614
615 Ok(results)
616 }
617
618 pub async fn get_traffic_patterns(
620 &self,
621 days: i64,
622 workspace_id: Option<&str>,
623 ) -> Result<Vec<TrafficPattern>> {
624 let start_date = chrono::Utc::now() - chrono::Duration::days(days);
625 let start_date_str = start_date.format("%Y-%m-%d").to_string();
626
627 let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
628
629 if let Some(_workspace) = workspace_id {
630 query.push_str(" AND workspace_id = ?");
631 }
632
633 query.push_str(" ORDER BY date ASC, hour ASC");
634
635 let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
636
637 if let Some(workspace) = workspace_id {
638 query_builder = query_builder.bind(workspace);
639 }
640
641 let results = query_builder.fetch_all(&self.pool).await?;
642
643 Ok(results)
644 }
645
646 pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
652 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
653
654 let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
655 .bind(cutoff)
656 .execute(&self.pool)
657 .await?;
658
659 info!(
660 "Cleaned up {} minute aggregates older than {} days",
661 result.rows_affected(),
662 days
663 );
664 Ok(result.rows_affected())
665 }
666
667 pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
669 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
670
671 let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
672 .bind(cutoff)
673 .execute(&self.pool)
674 .await?;
675
676 info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
677 Ok(result.rows_affected())
678 }
679
680 pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
682 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
683
684 let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
685 .bind(cutoff)
686 .execute(&self.pool)
687 .await?;
688
689 info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
690 Ok(result.rows_affected())
691 }
692
693 pub async fn vacuum(&self) -> Result<()> {
695 info!("Running VACUUM on analytics database");
696 sqlx::query("VACUUM").execute(&self.pool).await?;
697 info!("VACUUM completed");
698 Ok(())
699 }
700}
701
702#[cfg(test)]
703mod tests {
704 use super::*;
705
706 #[tokio::test]
707 async fn test_database_creation() {
708 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
709 db.run_migrations().await.unwrap();
710 }
711
712 #[tokio::test]
713 async fn test_insert_minute_aggregate() {
714 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
715 db.run_migrations().await.unwrap();
716
717 let agg = MetricsAggregate {
718 id: None,
719 timestamp: chrono::Utc::now().timestamp(),
720 protocol: "HTTP".to_string(),
721 method: Some("GET".to_string()),
722 endpoint: Some("/api/test".to_string()),
723 status_code: Some(200),
724 workspace_id: None,
725 environment: None,
726 request_count: 100,
727 error_count: 5,
728 latency_sum: 500.0,
729 latency_min: Some(10.0),
730 latency_max: Some(100.0),
731 latency_p50: Some(45.0),
732 latency_p95: Some(95.0),
733 latency_p99: Some(99.0),
734 bytes_sent: 10000,
735 bytes_received: 5000,
736 active_connections: Some(10),
737 created_at: None,
738 };
739
740 let id = db.insert_minute_aggregate(&agg).await.unwrap();
741 assert!(id > 0);
742 }
743}
744
745impl AnalyticsDatabase {
750 pub async fn record_scenario_usage(
752 &self,
753 scenario_id: &str,
754 workspace_id: Option<&str>,
755 org_id: Option<&str>,
756 ) -> Result<()> {
757 let now = chrono::Utc::now().timestamp();
758
759 let rows_affected = sqlx::query(
762 "UPDATE scenario_usage_metrics
763 SET usage_count = usage_count + 1,
764 last_used_at = ?,
765 updated_at = ?
766 WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
767 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
768 )
769 .bind(now)
770 .bind(now)
771 .bind(scenario_id)
772 .bind(workspace_id)
773 .bind(workspace_id)
774 .bind(org_id)
775 .bind(org_id)
776 .execute(&self.pool)
777 .await?;
778
779 if rows_affected.rows_affected() == 0 {
781 sqlx::query(
782 "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
783 VALUES (?, ?, ?, 1, ?, ?, ?)"
784 )
785 .bind(scenario_id)
786 .bind(workspace_id)
787 .bind(org_id)
788 .bind(now)
789 .bind(now)
790 .bind(now)
791 .execute(&self.pool)
792 .await?;
793 }
794
795 Ok(())
796 }
797
798 pub async fn record_persona_ci_hit(
800 &self,
801 persona_id: &str,
802 workspace_id: Option<&str>,
803 org_id: Option<&str>,
804 ci_run_id: Option<&str>,
805 ) -> Result<()> {
806 let now = chrono::Utc::now().timestamp();
807
808 sqlx::query(
809 "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
810 VALUES (?, ?, ?, ?, 1, ?)"
811 )
812 .bind(persona_id)
813 .bind(workspace_id)
814 .bind(org_id)
815 .bind(ci_run_id)
816 .bind(now)
817 .execute(&self.pool)
818 .await?;
819
820 Ok(())
821 }
822
823 pub async fn record_endpoint_coverage(
825 &self,
826 endpoint: &str,
827 method: Option<&str>,
828 protocol: &str,
829 workspace_id: Option<&str>,
830 org_id: Option<&str>,
831 coverage_percentage: Option<f64>,
832 ) -> Result<()> {
833 let now = chrono::Utc::now().timestamp();
834
835 let rows_affected = sqlx::query(
837 "UPDATE endpoint_coverage
838 SET test_count = test_count + 1,
839 last_tested_at = ?,
840 coverage_percentage = COALESCE(?, coverage_percentage),
841 updated_at = ?
842 WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
843 AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
844 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
845 )
846 .bind(now)
847 .bind(coverage_percentage)
848 .bind(now)
849 .bind(endpoint)
850 .bind(method)
851 .bind(method)
852 .bind(protocol)
853 .bind(workspace_id)
854 .bind(workspace_id)
855 .bind(org_id)
856 .bind(org_id)
857 .execute(&self.pool)
858 .await?;
859
860 if rows_affected.rows_affected() == 0 {
862 sqlx::query(
863 "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
864 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
865 )
866 .bind(endpoint)
867 .bind(method)
868 .bind(protocol)
869 .bind(workspace_id)
870 .bind(org_id)
871 .bind(now)
872 .bind(coverage_percentage)
873 .bind(now)
874 .bind(now)
875 .execute(&self.pool)
876 .await?;
877 }
878
879 Ok(())
880 }
881
882 pub async fn record_reality_level_staleness(
884 &self,
885 workspace_id: &str,
886 org_id: Option<&str>,
887 endpoint: Option<&str>,
888 method: Option<&str>,
889 protocol: Option<&str>,
890 current_reality_level: Option<&str>,
891 staleness_days: Option<i32>,
892 ) -> Result<()> {
893 let now = chrono::Utc::now().timestamp();
894 let last_updated = if let Some(days) = staleness_days {
895 Some(now - (i64::from(days) * 86400))
896 } else {
897 Some(now)
898 };
899
900 sqlx::query(
901 "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
902 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
903 ON CONFLICT DO NOTHING"
904 )
905 .bind(workspace_id)
906 .bind(org_id)
907 .bind(endpoint)
908 .bind(method)
909 .bind(protocol)
910 .bind(current_reality_level)
911 .bind(last_updated)
912 .bind(staleness_days)
913 .bind(now)
914 .bind(now)
915 .execute(&self.pool)
916 .await?;
917
918 Ok(())
919 }
920
921 pub async fn record_drift_percentage(
923 &self,
924 workspace_id: &str,
925 org_id: Option<&str>,
926 total_mocks: i64,
927 drifting_mocks: i64,
928 ) -> Result<()> {
929 let now = chrono::Utc::now().timestamp();
930 let drift_percentage = if total_mocks > 0 {
931 (drifting_mocks as f64 / total_mocks as f64) * 100.0
932 } else {
933 0.0
934 };
935
936 sqlx::query(
937 "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
938 VALUES (?, ?, ?, ?, ?, ?)"
939 )
940 .bind(workspace_id)
941 .bind(org_id)
942 .bind(total_mocks)
943 .bind(drifting_mocks)
944 .bind(drift_percentage)
945 .bind(now)
946 .execute(&self.pool)
947 .await?;
948
949 Ok(())
950 }
951
952 pub async fn get_scenario_usage(
954 &self,
955 workspace_id: Option<&str>,
956 org_id: Option<&str>,
957 limit: Option<i64>,
958 ) -> Result<Vec<ScenarioUsageMetrics>> {
959 let limit = limit.unwrap_or(100);
960 let mut query = String::from(
961 "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
962 FROM scenario_usage_metrics
963 WHERE 1=1"
964 );
965
966 if workspace_id.is_some() {
967 query.push_str(" AND workspace_id = ?");
968 }
969 if org_id.is_some() {
970 query.push_str(" AND org_id = ?");
971 }
972 query.push_str(" ORDER BY usage_count DESC LIMIT ?");
973
974 let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
975 if let Some(ws_id) = workspace_id {
976 q = q.bind(ws_id);
977 }
978 if let Some(o_id) = org_id {
979 q = q.bind(o_id);
980 }
981 q = q.bind(limit);
982
983 let results = q.fetch_all(&self.pool).await?;
984 Ok(results)
985 }
986
987 pub async fn get_persona_ci_hits(
989 &self,
990 workspace_id: Option<&str>,
991 org_id: Option<&str>,
992 limit: Option<i64>,
993 ) -> Result<Vec<PersonaCIHit>> {
994 let limit = limit.unwrap_or(100);
995 let mut query = String::from(
996 "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
997 FROM persona_ci_hits
998 WHERE 1=1",
999 );
1000
1001 if workspace_id.is_some() {
1002 query.push_str(" AND workspace_id = ?");
1003 }
1004 if org_id.is_some() {
1005 query.push_str(" AND org_id = ?");
1006 }
1007 query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1008
1009 let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1010 if let Some(ws_id) = workspace_id {
1011 q = q.bind(ws_id);
1012 }
1013 if let Some(o_id) = org_id {
1014 q = q.bind(o_id);
1015 }
1016 q = q.bind(limit);
1017
1018 let results = q.fetch_all(&self.pool).await?;
1019 Ok(results)
1020 }
1021
1022 pub async fn get_endpoint_coverage(
1024 &self,
1025 workspace_id: Option<&str>,
1026 org_id: Option<&str>,
1027 min_coverage: Option<f64>,
1028 ) -> Result<Vec<EndpointCoverage>> {
1029 let mut query = String::from(
1030 "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1031 FROM endpoint_coverage
1032 WHERE 1=1"
1033 );
1034
1035 if workspace_id.is_some() {
1036 query.push_str(" AND workspace_id = ?");
1037 }
1038 if org_id.is_some() {
1039 query.push_str(" AND org_id = ?");
1040 }
1041 if min_coverage.is_some() {
1042 query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1043 }
1044 query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1045
1046 let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1047 if let Some(ws_id) = workspace_id {
1048 q = q.bind(ws_id);
1049 }
1050 if let Some(o_id) = org_id {
1051 q = q.bind(o_id);
1052 }
1053 if let Some(min_cov) = min_coverage {
1054 q = q.bind(min_cov);
1055 }
1056
1057 let results = q.fetch_all(&self.pool).await?;
1058 Ok(results)
1059 }
1060
1061 pub async fn get_reality_level_staleness(
1063 &self,
1064 workspace_id: Option<&str>,
1065 org_id: Option<&str>,
1066 max_staleness_days: Option<i32>,
1067 ) -> Result<Vec<RealityLevelStaleness>> {
1068 let mut query = String::from(
1069 "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1070 FROM reality_level_staleness
1071 WHERE 1=1"
1072 );
1073
1074 if workspace_id.is_some() {
1075 query.push_str(" AND workspace_id = ?");
1076 }
1077 if org_id.is_some() {
1078 query.push_str(" AND org_id = ?");
1079 }
1080 if max_staleness_days.is_some() {
1081 query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1082 }
1083 query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1084
1085 let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1086 if let Some(ws_id) = workspace_id {
1087 q = q.bind(ws_id);
1088 }
1089 if let Some(o_id) = org_id {
1090 q = q.bind(o_id);
1091 }
1092 if let Some(max_days) = max_staleness_days {
1093 q = q.bind(max_days);
1094 }
1095
1096 let results = q.fetch_all(&self.pool).await?;
1097 Ok(results)
1098 }
1099
1100 pub async fn get_drift_percentage(
1102 &self,
1103 workspace_id: Option<&str>,
1104 org_id: Option<&str>,
1105 limit: Option<i64>,
1106 ) -> Result<Vec<DriftPercentageMetrics>> {
1107 let limit = limit.unwrap_or(100);
1108 let mut query = String::from(
1109 "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1110 FROM drift_percentage_metrics
1111 WHERE 1=1"
1112 );
1113
1114 if workspace_id.is_some() {
1115 query.push_str(" AND workspace_id = ?");
1116 }
1117 if org_id.is_some() {
1118 query.push_str(" AND org_id = ?");
1119 }
1120 query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1121
1122 let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1123 if let Some(ws_id) = workspace_id {
1124 q = q.bind(ws_id);
1125 }
1126 if let Some(o_id) = org_id {
1127 q = q.bind(o_id);
1128 }
1129 q = q.bind(limit);
1130
1131 let results = q.fetch_all(&self.pool).await?;
1132 Ok(results)
1133 }
1134}