1use crate::error::{AnalyticsError, Result};
4use crate::models::{
5 AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6 EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7 PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
16pub struct AnalyticsDatabase {
17 pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21 pub async fn new(database_path: &Path) -> Result<Self> {
31 let db_url = if database_path.to_str() == Some(":memory:") {
32 "sqlite::memory:".to_string()
33 } else {
34 format!("sqlite://{}", database_path.display())
35 };
36
37 info!("Connecting to analytics database: {}", db_url);
38
39 let pool =
40 SqlitePoolOptions::new()
41 .max_connections(10)
42 .connect(&db_url)
43 .await
44 .map_err(|e| {
45 error!("Failed to connect to analytics database: {}", e);
46 AnalyticsError::Database(e)
47 })?;
48
49 sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
51
52 sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
54
55 Ok(Self { pool })
56 }
57
58 pub async fn run_migrations(&self) -> Result<()> {
64 info!("Running analytics database migrations");
65
66 let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
68 let mut conn = self.pool.acquire().await?;
69 let mut stream = conn.execute_many(migration_sql);
70
71 while stream
72 .try_next()
73 .await
74 .map_err(|e| {
75 error!("Migration error: {}", e);
76 AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
77 })?
78 .is_some()
79 {}
80
81 let coverage_migration_sql = include_str!("../migrations/003_coverage_metrics.sql");
83 let mut conn = self.pool.acquire().await?;
84 let mut stream = conn.execute_many(coverage_migration_sql);
85
86 while stream
87 .try_next()
88 .await
89 .map_err(|e| {
90 error!("Coverage metrics migration error: {}", e);
91 AnalyticsError::Migration(format!(
92 "Failed to execute coverage metrics migration: {e}"
93 ))
94 })?
95 .is_some()
96 {}
97
98 let pillar_usage_migration_sql = include_str!("../migrations/002_pillar_usage.sql");
100 let mut conn = self.pool.acquire().await?;
101 let mut stream = conn.execute_many(pillar_usage_migration_sql);
102
103 while stream
104 .try_next()
105 .await
106 .map_err(|e| {
107 error!("Pillar usage migration error: {}", e);
108 AnalyticsError::Migration(format!("Failed to execute pillar usage migration: {e}"))
109 })?
110 .is_some()
111 {}
112
113 info!("Analytics database migrations completed successfully");
114 Ok(())
115 }
116
117 #[must_use]
119 pub const fn pool(&self) -> &SqlitePool {
120 &self.pool
121 }
122
123 pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
133 let result = sqlx::query(
134 "INSERT INTO metrics_aggregates_minute (
135 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
136 request_count, error_count, latency_sum, latency_min, latency_max,
137 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
138 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
139 )
140 .bind(agg.timestamp)
141 .bind(&agg.protocol)
142 .bind(&agg.method)
143 .bind(&agg.endpoint)
144 .bind(agg.status_code)
145 .bind(&agg.workspace_id)
146 .bind(&agg.environment)
147 .bind(agg.request_count)
148 .bind(agg.error_count)
149 .bind(agg.latency_sum)
150 .bind(agg.latency_min)
151 .bind(agg.latency_max)
152 .bind(agg.latency_p50)
153 .bind(agg.latency_p95)
154 .bind(agg.latency_p99)
155 .bind(agg.bytes_sent)
156 .bind(agg.bytes_received)
157 .bind(agg.active_connections)
158 .execute(&self.pool)
159 .await?;
160
161 Ok(result.last_insert_rowid())
162 }
163
164 pub async fn insert_minute_aggregates_batch(
170 &self,
171 aggregates: &[MetricsAggregate],
172 ) -> Result<()> {
173 if aggregates.is_empty() {
174 return Ok(());
175 }
176
177 let mut tx = self.pool.begin().await?;
178
179 for agg in aggregates {
180 sqlx::query(
181 r"
182 INSERT INTO metrics_aggregates_minute (
183 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
184 request_count, error_count, latency_sum, latency_min, latency_max,
185 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
186 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
187 ",
188 )
189 .bind(agg.timestamp)
190 .bind(&agg.protocol)
191 .bind(&agg.method)
192 .bind(&agg.endpoint)
193 .bind(agg.status_code)
194 .bind(&agg.workspace_id)
195 .bind(&agg.environment)
196 .bind(agg.request_count)
197 .bind(agg.error_count)
198 .bind(agg.latency_sum)
199 .bind(agg.latency_min)
200 .bind(agg.latency_max)
201 .bind(agg.latency_p50)
202 .bind(agg.latency_p95)
203 .bind(agg.latency_p99)
204 .bind(agg.bytes_sent)
205 .bind(agg.bytes_received)
206 .bind(agg.active_connections)
207 .execute(&mut *tx)
208 .await?;
209 }
210
211 tx.commit().await?;
212 debug!("Inserted {} minute aggregates", aggregates.len());
213 Ok(())
214 }
215
216 pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
222 let result = sqlx::query(
223 r"
224 INSERT INTO metrics_aggregates_hour (
225 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
226 request_count, error_count, latency_sum, latency_min, latency_max,
227 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
228 active_connections_avg, active_connections_max
229 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
230 ",
231 )
232 .bind(agg.timestamp)
233 .bind(&agg.protocol)
234 .bind(&agg.method)
235 .bind(&agg.endpoint)
236 .bind(agg.status_code)
237 .bind(&agg.workspace_id)
238 .bind(&agg.environment)
239 .bind(agg.request_count)
240 .bind(agg.error_count)
241 .bind(agg.latency_sum)
242 .bind(agg.latency_min)
243 .bind(agg.latency_max)
244 .bind(agg.latency_p50)
245 .bind(agg.latency_p95)
246 .bind(agg.latency_p99)
247 .bind(agg.bytes_sent)
248 .bind(agg.bytes_received)
249 .bind(agg.active_connections_avg)
250 .bind(agg.active_connections_max)
251 .execute(&self.pool)
252 .await?;
253
254 Ok(result.last_insert_rowid())
255 }
256
257 pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
263 let result = sqlx::query(
264 r"
265 INSERT INTO metrics_aggregates_day (
266 date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
267 request_count, error_count, latency_sum, latency_min, latency_max,
268 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
269 active_connections_avg, active_connections_max, unique_clients, peak_hour
270 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
271 ",
272 )
273 .bind(&agg.date)
274 .bind(agg.timestamp)
275 .bind(&agg.protocol)
276 .bind(&agg.method)
277 .bind(&agg.endpoint)
278 .bind(agg.status_code)
279 .bind(&agg.workspace_id)
280 .bind(&agg.environment)
281 .bind(agg.request_count)
282 .bind(agg.error_count)
283 .bind(agg.latency_sum)
284 .bind(agg.latency_min)
285 .bind(agg.latency_max)
286 .bind(agg.latency_p50)
287 .bind(agg.latency_p95)
288 .bind(agg.latency_p99)
289 .bind(agg.bytes_sent)
290 .bind(agg.bytes_received)
291 .bind(agg.active_connections_avg)
292 .bind(agg.active_connections_max)
293 .bind(agg.unique_clients)
294 .bind(agg.peak_hour)
295 .execute(&self.pool)
296 .await?;
297
298 Ok(result.last_insert_rowid())
299 }
300
301 pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
307 sqlx::query(
308 r"
309 INSERT INTO endpoint_stats (
310 endpoint, protocol, method, workspace_id, environment,
311 total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
312 p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
313 first_seen, last_seen
314 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
315 ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
316 DO UPDATE SET
317 total_requests = total_requests + excluded.total_requests,
318 total_errors = total_errors + excluded.total_errors,
319 avg_latency_ms = excluded.avg_latency_ms,
320 min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
321 max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
322 p95_latency_ms = excluded.p95_latency_ms,
323 status_codes = excluded.status_codes,
324 total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
325 total_bytes_received = total_bytes_received + excluded.total_bytes_received,
326 last_seen = excluded.last_seen,
327 updated_at = strftime('%s', 'now')
328 ",
329 )
330 .bind(&stats.endpoint)
331 .bind(&stats.protocol)
332 .bind(&stats.method)
333 .bind(&stats.workspace_id)
334 .bind(&stats.environment)
335 .bind(stats.total_requests)
336 .bind(stats.total_errors)
337 .bind(stats.avg_latency_ms)
338 .bind(stats.min_latency_ms)
339 .bind(stats.max_latency_ms)
340 .bind(stats.p95_latency_ms)
341 .bind(&stats.status_codes)
342 .bind(stats.total_bytes_sent)
343 .bind(stats.total_bytes_received)
344 .bind(stats.first_seen)
345 .bind(stats.last_seen)
346 .execute(&self.pool)
347 .await?;
348
349 Ok(())
350 }
351
352 pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
358 let result = sqlx::query(
359 r"
360 INSERT INTO error_events (
361 timestamp, protocol, method, endpoint, status_code,
362 error_type, error_message, error_category,
363 request_id, trace_id, span_id,
364 client_ip, user_agent, workspace_id, environment, metadata
365 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
366 ",
367 )
368 .bind(error.timestamp)
369 .bind(&error.protocol)
370 .bind(&error.method)
371 .bind(&error.endpoint)
372 .bind(error.status_code)
373 .bind(&error.error_type)
374 .bind(&error.error_message)
375 .bind(&error.error_category)
376 .bind(&error.request_id)
377 .bind(&error.trace_id)
378 .bind(&error.span_id)
379 .bind(&error.client_ip)
380 .bind(&error.user_agent)
381 .bind(&error.workspace_id)
382 .bind(&error.environment)
383 .bind(&error.metadata)
384 .execute(&self.pool)
385 .await?;
386
387 Ok(result.last_insert_rowid())
388 }
389
390 pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
396 sqlx::query(
397 r"
398 INSERT INTO traffic_patterns (
399 date, hour, day_of_week, protocol, workspace_id, environment,
400 request_count, error_count, avg_latency_ms, unique_clients
401 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
402 ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
403 DO UPDATE SET
404 request_count = request_count + excluded.request_count,
405 error_count = error_count + excluded.error_count,
406 avg_latency_ms = excluded.avg_latency_ms,
407 unique_clients = excluded.unique_clients
408 ",
409 )
410 .bind(&pattern.date)
411 .bind(pattern.hour)
412 .bind(pattern.day_of_week)
413 .bind(&pattern.protocol)
414 .bind(&pattern.workspace_id)
415 .bind(&pattern.environment)
416 .bind(pattern.request_count)
417 .bind(pattern.error_count)
418 .bind(pattern.avg_latency_ms)
419 .bind(pattern.unique_clients)
420 .execute(&self.pool)
421 .await?;
422
423 Ok(())
424 }
425
426 pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
432 let result = sqlx::query(
433 r"
434 INSERT INTO analytics_snapshots (
435 timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
436 active_connections, protocol_stats, top_endpoints,
437 memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
438 workspace_id, environment
439 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
440 ",
441 )
442 .bind(snapshot.timestamp)
443 .bind(&snapshot.snapshot_type)
444 .bind(snapshot.total_requests)
445 .bind(snapshot.total_errors)
446 .bind(snapshot.avg_latency_ms)
447 .bind(snapshot.active_connections)
448 .bind(&snapshot.protocol_stats)
449 .bind(&snapshot.top_endpoints)
450 .bind(snapshot.memory_usage_bytes)
451 .bind(snapshot.cpu_usage_percent)
452 .bind(snapshot.thread_count)
453 .bind(snapshot.uptime_seconds)
454 .bind(&snapshot.workspace_id)
455 .bind(&snapshot.environment)
456 .execute(&self.pool)
457 .await?;
458
459 Ok(result.last_insert_rowid())
460 }
461
462 pub async fn get_minute_aggregates(
472 &self,
473 filter: &AnalyticsFilter,
474 ) -> Result<Vec<MetricsAggregate>> {
475 let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
476
477 if filter.start_time.is_some() {
478 query.push_str(" AND timestamp >= ?");
479 }
480 if filter.end_time.is_some() {
481 query.push_str(" AND timestamp <= ?");
482 }
483 if filter.protocol.is_some() {
484 query.push_str(" AND protocol = ?");
485 }
486 if filter.endpoint.is_some() {
487 query.push_str(" AND endpoint = ?");
488 }
489 if filter.method.is_some() {
490 query.push_str(" AND method = ?");
491 }
492 if filter.status_code.is_some() {
493 query.push_str(" AND status_code = ?");
494 }
495 if filter.workspace_id.is_some() {
496 query.push_str(" AND workspace_id = ?");
497 }
498 if filter.environment.is_some() {
499 query.push_str(" AND environment = ?");
500 }
501
502 query.push_str(" ORDER BY timestamp DESC");
503
504 if filter.limit.is_some() {
505 query.push_str(" LIMIT ?");
506 }
507
508 let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
510
511 if let Some(start) = filter.start_time {
512 sql_query = sql_query.bind(start);
513 }
514 if let Some(end) = filter.end_time {
515 sql_query = sql_query.bind(end);
516 }
517 if let Some(ref protocol) = filter.protocol {
518 sql_query = sql_query.bind(protocol);
519 }
520 if let Some(ref endpoint) = filter.endpoint {
521 sql_query = sql_query.bind(endpoint);
522 }
523 if let Some(ref method) = filter.method {
524 sql_query = sql_query.bind(method);
525 }
526 if let Some(status) = filter.status_code {
527 sql_query = sql_query.bind(status);
528 }
529 if let Some(ref workspace) = filter.workspace_id {
530 sql_query = sql_query.bind(workspace);
531 }
532 if let Some(ref env) = filter.environment {
533 sql_query = sql_query.bind(env);
534 }
535 if let Some(limit) = filter.limit {
536 sql_query = sql_query.bind(limit);
537 }
538
539 let results = sql_query.fetch_all(&self.pool).await?;
540
541 Ok(results)
542 }
543
544 pub async fn get_hour_aggregates(
550 &self,
551 filter: &AnalyticsFilter,
552 ) -> Result<Vec<HourMetricsAggregate>> {
553 let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
554
555 if filter.start_time.is_some() {
556 query.push_str(" AND timestamp >= ?");
557 }
558 if filter.end_time.is_some() {
559 query.push_str(" AND timestamp <= ?");
560 }
561 if filter.protocol.is_some() {
562 query.push_str(" AND protocol = ?");
563 }
564 if filter.endpoint.is_some() {
565 query.push_str(" AND endpoint = ?");
566 }
567 if filter.method.is_some() {
568 query.push_str(" AND method = ?");
569 }
570 if filter.status_code.is_some() {
571 query.push_str(" AND status_code = ?");
572 }
573 if filter.workspace_id.is_some() {
574 query.push_str(" AND workspace_id = ?");
575 }
576 if filter.environment.is_some() {
577 query.push_str(" AND environment = ?");
578 }
579
580 query.push_str(" ORDER BY timestamp DESC");
581
582 if filter.limit.is_some() {
583 query.push_str(" LIMIT ?");
584 }
585
586 let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
588
589 if let Some(start) = filter.start_time {
590 sql_query = sql_query.bind(start);
591 }
592 if let Some(end) = filter.end_time {
593 sql_query = sql_query.bind(end);
594 }
595 if let Some(ref protocol) = filter.protocol {
596 sql_query = sql_query.bind(protocol);
597 }
598 if let Some(ref endpoint) = filter.endpoint {
599 sql_query = sql_query.bind(endpoint);
600 }
601 if let Some(ref method) = filter.method {
602 sql_query = sql_query.bind(method);
603 }
604 if let Some(status) = filter.status_code {
605 sql_query = sql_query.bind(status);
606 }
607 if let Some(ref workspace) = filter.workspace_id {
608 sql_query = sql_query.bind(workspace);
609 }
610 if let Some(ref env) = filter.environment {
611 sql_query = sql_query.bind(env);
612 }
613 if let Some(limit) = filter.limit {
614 sql_query = sql_query.bind(limit);
615 }
616
617 let results = sql_query.fetch_all(&self.pool).await?;
618
619 Ok(results)
620 }
621
622 pub async fn get_top_endpoints(
628 &self,
629 limit: i64,
630 workspace_id: Option<&str>,
631 ) -> Result<Vec<EndpointStats>> {
632 let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
633
634 if workspace_id.is_some() {
635 query.push_str(" AND workspace_id = ?");
636 }
637
638 query.push_str(" ORDER BY total_requests DESC LIMIT ?");
639
640 let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
641
642 if let Some(workspace) = workspace_id {
643 sql_query = sql_query.bind(workspace);
644 }
645
646 sql_query = sql_query.bind(limit);
647
648 let results = sql_query.fetch_all(&self.pool).await?;
649
650 Ok(results)
651 }
652
653 pub async fn get_recent_errors(
659 &self,
660 limit: i64,
661 filter: &AnalyticsFilter,
662 ) -> Result<Vec<ErrorEvent>> {
663 let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
664
665 if filter.start_time.is_some() {
666 query.push_str(" AND timestamp >= ?");
667 }
668 if filter.end_time.is_some() {
669 query.push_str(" AND timestamp <= ?");
670 }
671 if filter.endpoint.is_some() {
672 query.push_str(" AND endpoint = ?");
673 }
674 if filter.workspace_id.is_some() {
675 query.push_str(" AND workspace_id = ?");
676 }
677
678 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
679
680 let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
681
682 if let Some(start) = filter.start_time {
683 sql_query = sql_query.bind(start);
684 }
685 if let Some(end) = filter.end_time {
686 sql_query = sql_query.bind(end);
687 }
688 if let Some(ref endpoint) = filter.endpoint {
689 sql_query = sql_query.bind(endpoint);
690 }
691 if let Some(ref workspace) = filter.workspace_id {
692 sql_query = sql_query.bind(workspace);
693 }
694
695 sql_query = sql_query.bind(limit);
696
697 let results = sql_query.fetch_all(&self.pool).await?;
698
699 Ok(results)
700 }
701
702 pub async fn get_traffic_patterns(
708 &self,
709 days: i64,
710 workspace_id: Option<&str>,
711 ) -> Result<Vec<TrafficPattern>> {
712 let start_date = chrono::Utc::now() - chrono::Duration::days(days);
713 let start_date_str = start_date.format("%Y-%m-%d").to_string();
714
715 let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
716
717 if workspace_id.is_some() {
718 query.push_str(" AND workspace_id = ?");
719 }
720
721 query.push_str(" ORDER BY date ASC, hour ASC");
722
723 let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
724
725 if let Some(workspace) = workspace_id {
726 query_builder = query_builder.bind(workspace);
727 }
728
729 let results = query_builder.fetch_all(&self.pool).await?;
730
731 Ok(results)
732 }
733
734 pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
744 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
745
746 let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
747 .bind(cutoff)
748 .execute(&self.pool)
749 .await?;
750
751 info!(
752 "Cleaned up {} minute aggregates older than {} days",
753 result.rows_affected(),
754 days
755 );
756 Ok(result.rows_affected())
757 }
758
759 pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
765 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
766
767 let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
768 .bind(cutoff)
769 .execute(&self.pool)
770 .await?;
771
772 info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
773 Ok(result.rows_affected())
774 }
775
776 pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
782 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
783
784 let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
785 .bind(cutoff)
786 .execute(&self.pool)
787 .await?;
788
789 info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
790 Ok(result.rows_affected())
791 }
792
793 pub async fn vacuum(&self) -> Result<()> {
799 info!("Running VACUUM on analytics database");
800 sqlx::query("VACUUM").execute(&self.pool).await?;
801 info!("VACUUM completed");
802 Ok(())
803 }
804}
805
806impl AnalyticsDatabase {
811 pub async fn record_scenario_usage(
817 &self,
818 scenario_id: &str,
819 workspace_id: Option<&str>,
820 org_id: Option<&str>,
821 ) -> Result<()> {
822 let now = chrono::Utc::now().timestamp();
823
824 let rows_affected = sqlx::query(
827 "UPDATE scenario_usage_metrics
828 SET usage_count = usage_count + 1,
829 last_used_at = ?,
830 updated_at = ?
831 WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
832 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
833 )
834 .bind(now)
835 .bind(now)
836 .bind(scenario_id)
837 .bind(workspace_id)
838 .bind(workspace_id)
839 .bind(org_id)
840 .bind(org_id)
841 .execute(&self.pool)
842 .await?;
843
844 if rows_affected.rows_affected() == 0 {
846 sqlx::query(
847 "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
848 VALUES (?, ?, ?, 1, ?, ?, ?)"
849 )
850 .bind(scenario_id)
851 .bind(workspace_id)
852 .bind(org_id)
853 .bind(now)
854 .bind(now)
855 .bind(now)
856 .execute(&self.pool)
857 .await?;
858 }
859
860 Ok(())
861 }
862
863 pub async fn record_persona_ci_hit(
869 &self,
870 persona_id: &str,
871 workspace_id: Option<&str>,
872 org_id: Option<&str>,
873 ci_run_id: Option<&str>,
874 ) -> Result<()> {
875 let now = chrono::Utc::now().timestamp();
876
877 sqlx::query(
878 "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
879 VALUES (?, ?, ?, ?, 1, ?)"
880 )
881 .bind(persona_id)
882 .bind(workspace_id)
883 .bind(org_id)
884 .bind(ci_run_id)
885 .bind(now)
886 .execute(&self.pool)
887 .await?;
888
889 Ok(())
890 }
891
892 #[allow(clippy::too_many_arguments)]
898 pub async fn record_endpoint_coverage(
899 &self,
900 endpoint: &str,
901 method: Option<&str>,
902 protocol: &str,
903 workspace_id: Option<&str>,
904 org_id: Option<&str>,
905 coverage_percentage: Option<f64>,
906 ) -> Result<()> {
907 let now = chrono::Utc::now().timestamp();
908
909 let rows_affected = sqlx::query(
911 "UPDATE endpoint_coverage
912 SET test_count = test_count + 1,
913 last_tested_at = ?,
914 coverage_percentage = COALESCE(?, coverage_percentage),
915 updated_at = ?
916 WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
917 AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
918 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
919 )
920 .bind(now)
921 .bind(coverage_percentage)
922 .bind(now)
923 .bind(endpoint)
924 .bind(method)
925 .bind(method)
926 .bind(protocol)
927 .bind(workspace_id)
928 .bind(workspace_id)
929 .bind(org_id)
930 .bind(org_id)
931 .execute(&self.pool)
932 .await?;
933
934 if rows_affected.rows_affected() == 0 {
936 sqlx::query(
937 "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
938 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
939 )
940 .bind(endpoint)
941 .bind(method)
942 .bind(protocol)
943 .bind(workspace_id)
944 .bind(org_id)
945 .bind(now)
946 .bind(coverage_percentage)
947 .bind(now)
948 .bind(now)
949 .execute(&self.pool)
950 .await?;
951 }
952
953 Ok(())
954 }
955
956 #[allow(clippy::too_many_arguments)]
962 pub async fn record_reality_level_staleness(
963 &self,
964 workspace_id: &str,
965 org_id: Option<&str>,
966 endpoint: Option<&str>,
967 method: Option<&str>,
968 protocol: Option<&str>,
969 current_reality_level: Option<&str>,
970 staleness_days: Option<i32>,
971 ) -> Result<()> {
972 let now = chrono::Utc::now().timestamp();
973 let last_updated = Some(staleness_days.map_or(now, |days| now - (i64::from(days) * 86400)));
974
975 sqlx::query(
976 "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
977 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
978 ON CONFLICT DO NOTHING"
979 )
980 .bind(workspace_id)
981 .bind(org_id)
982 .bind(endpoint)
983 .bind(method)
984 .bind(protocol)
985 .bind(current_reality_level)
986 .bind(last_updated)
987 .bind(staleness_days)
988 .bind(now)
989 .bind(now)
990 .execute(&self.pool)
991 .await?;
992
993 Ok(())
994 }
995
996 pub async fn record_drift_percentage(
1002 &self,
1003 workspace_id: &str,
1004 org_id: Option<&str>,
1005 total_mocks: i64,
1006 drifting_mocks: i64,
1007 ) -> Result<()> {
1008 let now = chrono::Utc::now().timestamp();
1009 #[allow(clippy::cast_precision_loss)]
1010 let drift_percentage = if total_mocks > 0 {
1011 (drifting_mocks as f64 / total_mocks as f64) * 100.0
1012 } else {
1013 0.0
1014 };
1015
1016 sqlx::query(
1017 "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
1018 VALUES (?, ?, ?, ?, ?, ?)"
1019 )
1020 .bind(workspace_id)
1021 .bind(org_id)
1022 .bind(total_mocks)
1023 .bind(drifting_mocks)
1024 .bind(drift_percentage)
1025 .bind(now)
1026 .execute(&self.pool)
1027 .await?;
1028
1029 Ok(())
1030 }
1031
1032 pub async fn get_scenario_usage(
1038 &self,
1039 workspace_id: Option<&str>,
1040 org_id: Option<&str>,
1041 limit: Option<i64>,
1042 ) -> Result<Vec<ScenarioUsageMetrics>> {
1043 let limit = limit.unwrap_or(100);
1044 let mut query = String::from(
1045 "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
1046 FROM scenario_usage_metrics
1047 WHERE 1=1"
1048 );
1049
1050 if workspace_id.is_some() {
1051 query.push_str(" AND workspace_id = ?");
1052 }
1053 if org_id.is_some() {
1054 query.push_str(" AND org_id = ?");
1055 }
1056 query.push_str(" ORDER BY usage_count DESC LIMIT ?");
1057
1058 let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
1059 if let Some(ws_id) = workspace_id {
1060 q = q.bind(ws_id);
1061 }
1062 if let Some(o_id) = org_id {
1063 q = q.bind(o_id);
1064 }
1065 q = q.bind(limit);
1066
1067 let results = q.fetch_all(&self.pool).await?;
1068 Ok(results)
1069 }
1070
1071 pub async fn get_persona_ci_hits(
1077 &self,
1078 workspace_id: Option<&str>,
1079 org_id: Option<&str>,
1080 limit: Option<i64>,
1081 ) -> Result<Vec<PersonaCIHit>> {
1082 let limit = limit.unwrap_or(100);
1083 let mut query = String::from(
1084 "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1085 FROM persona_ci_hits
1086 WHERE 1=1",
1087 );
1088
1089 if workspace_id.is_some() {
1090 query.push_str(" AND workspace_id = ?");
1091 }
1092 if org_id.is_some() {
1093 query.push_str(" AND org_id = ?");
1094 }
1095 query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1096
1097 let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1098 if let Some(ws_id) = workspace_id {
1099 q = q.bind(ws_id);
1100 }
1101 if let Some(o_id) = org_id {
1102 q = q.bind(o_id);
1103 }
1104 q = q.bind(limit);
1105
1106 let results = q.fetch_all(&self.pool).await?;
1107 Ok(results)
1108 }
1109
1110 pub async fn get_endpoint_coverage(
1116 &self,
1117 workspace_id: Option<&str>,
1118 org_id: Option<&str>,
1119 min_coverage: Option<f64>,
1120 ) -> Result<Vec<EndpointCoverage>> {
1121 let mut query = String::from(
1122 "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1123 FROM endpoint_coverage
1124 WHERE 1=1"
1125 );
1126
1127 if workspace_id.is_some() {
1128 query.push_str(" AND workspace_id = ?");
1129 }
1130 if org_id.is_some() {
1131 query.push_str(" AND org_id = ?");
1132 }
1133 if min_coverage.is_some() {
1134 query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1135 }
1136 query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1137
1138 let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1139 if let Some(ws_id) = workspace_id {
1140 q = q.bind(ws_id);
1141 }
1142 if let Some(o_id) = org_id {
1143 q = q.bind(o_id);
1144 }
1145 if let Some(min_cov) = min_coverage {
1146 q = q.bind(min_cov);
1147 }
1148
1149 let results = q.fetch_all(&self.pool).await?;
1150 Ok(results)
1151 }
1152
1153 pub async fn get_reality_level_staleness(
1159 &self,
1160 workspace_id: Option<&str>,
1161 org_id: Option<&str>,
1162 max_staleness_days: Option<i32>,
1163 ) -> Result<Vec<RealityLevelStaleness>> {
1164 let mut query = String::from(
1165 "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1166 FROM reality_level_staleness
1167 WHERE 1=1"
1168 );
1169
1170 if workspace_id.is_some() {
1171 query.push_str(" AND workspace_id = ?");
1172 }
1173 if org_id.is_some() {
1174 query.push_str(" AND org_id = ?");
1175 }
1176 if max_staleness_days.is_some() {
1177 query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1178 }
1179 query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1180
1181 let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1182 if let Some(ws_id) = workspace_id {
1183 q = q.bind(ws_id);
1184 }
1185 if let Some(o_id) = org_id {
1186 q = q.bind(o_id);
1187 }
1188 if let Some(max_days) = max_staleness_days {
1189 q = q.bind(max_days);
1190 }
1191
1192 let results = q.fetch_all(&self.pool).await?;
1193 Ok(results)
1194 }
1195
1196 pub async fn get_drift_percentage(
1202 &self,
1203 workspace_id: Option<&str>,
1204 org_id: Option<&str>,
1205 limit: Option<i64>,
1206 ) -> Result<Vec<DriftPercentageMetrics>> {
1207 let limit = limit.unwrap_or(100);
1208 let mut query = String::from(
1209 "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1210 FROM drift_percentage_metrics
1211 WHERE 1=1"
1212 );
1213
1214 if workspace_id.is_some() {
1215 query.push_str(" AND workspace_id = ?");
1216 }
1217 if org_id.is_some() {
1218 query.push_str(" AND org_id = ?");
1219 }
1220 query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1221
1222 let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1223 if let Some(ws_id) = workspace_id {
1224 q = q.bind(ws_id);
1225 }
1226 if let Some(o_id) = org_id {
1227 q = q.bind(o_id);
1228 }
1229 q = q.bind(limit);
1230
1231 let results = q.fetch_all(&self.pool).await?;
1232 Ok(results)
1233 }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239
1240 #[tokio::test]
1241 async fn test_database_creation() {
1242 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1243 db.run_migrations().await.unwrap();
1244 }
1245
1246 #[tokio::test]
1247 async fn test_insert_minute_aggregate() {
1248 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1249 db.run_migrations().await.unwrap();
1250
1251 let agg = MetricsAggregate {
1252 id: None,
1253 timestamp: chrono::Utc::now().timestamp(),
1254 protocol: "HTTP".to_string(),
1255 method: Some("GET".to_string()),
1256 endpoint: Some("/api/test".to_string()),
1257 status_code: Some(200),
1258 workspace_id: None,
1259 environment: None,
1260 request_count: 100,
1261 error_count: 5,
1262 latency_sum: 500.0,
1263 latency_min: Some(10.0),
1264 latency_max: Some(100.0),
1265 latency_p50: Some(45.0),
1266 latency_p95: Some(95.0),
1267 latency_p99: Some(99.0),
1268 bytes_sent: 10_000,
1269 bytes_received: 5_000,
1270 active_connections: Some(10),
1271 created_at: None,
1272 };
1273
1274 let id = db.insert_minute_aggregate(&agg).await.unwrap();
1275 assert!(id > 0);
1276 }
1277}