1use crate::error::{AnalyticsError, Result};
4use crate::models::{
5 AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6 EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7 PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
16pub struct AnalyticsDatabase {
17 pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21 pub async fn new(database_path: &Path) -> Result<Self> {
31 let db_url = if database_path.to_str() == Some(":memory:") {
32 "sqlite::memory:".to_string()
33 } else {
34 if let Some(parent) = database_path.parent() {
36 if !parent.as_os_str().is_empty() && !parent.exists() {
37 std::fs::create_dir_all(parent).map_err(|e| {
38 error!("Failed to create analytics database directory: {}", e);
39 AnalyticsError::Database(sqlx::Error::Io(e))
40 })?;
41 }
42 }
43 format!("sqlite://{}?mode=rwc", database_path.display())
44 };
45
46 info!("Connecting to analytics database: {}", db_url);
47
48 let pool =
49 SqlitePoolOptions::new()
50 .max_connections(10)
51 .connect(&db_url)
52 .await
53 .map_err(|e| {
54 error!("Failed to connect to analytics database: {}", e);
55 AnalyticsError::Database(e)
56 })?;
57
58 sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
60
61 sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
63
64 Ok(Self { pool })
65 }
66
67 pub async fn run_migrations(&self) -> Result<()> {
77 info!("Running analytics database migrations");
78
79 Self::run_migration_file(
80 &self.pool,
81 "001_analytics_schema",
82 include_str!("../migrations/001_analytics_schema.sql"),
83 )
84 .await?;
85 Self::run_migration_file(
86 &self.pool,
87 "003_coverage_metrics",
88 include_str!("../migrations/003_coverage_metrics.sql"),
89 )
90 .await?;
91 Self::run_migration_file(
92 &self.pool,
93 "002_pillar_usage",
94 include_str!("../migrations/002_pillar_usage.sql"),
95 )
96 .await?;
97
98 info!("Analytics database migrations completed successfully");
99 Ok(())
100 }
101
102 async fn run_migration_file(pool: &Pool<Sqlite>, name: &str, sql: &str) -> Result<()> {
106 debug!("Running migration: {}", name);
107 let mut conn = pool.acquire().await?;
108 let mut stream = conn.execute_many(sql);
109
110 while let Some(result) = stream.try_next().await.transpose() {
111 match result {
112 Ok(_) => {}
113 Err(e) => {
114 let msg = e.to_string();
115 if msg.contains("already exists") {
116 debug!(
117 "Migration {}: object already exists (safe to ignore): {}",
118 name, msg
119 );
120 } else {
121 error!("Migration {} error: {}", name, e);
122 return Err(AnalyticsError::Migration(format!(
123 "Failed to execute migration {name}: {e}"
124 )));
125 }
126 }
127 }
128 }
129 Ok(())
130 }
131
132 #[must_use]
134 pub const fn pool(&self) -> &SqlitePool {
135 &self.pool
136 }
137
138 pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
148 let result = sqlx::query(
149 "INSERT INTO metrics_aggregates_minute (
150 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
151 request_count, error_count, latency_sum, latency_min, latency_max,
152 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
153 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
154 )
155 .bind(agg.timestamp)
156 .bind(&agg.protocol)
157 .bind(&agg.method)
158 .bind(&agg.endpoint)
159 .bind(agg.status_code)
160 .bind(&agg.workspace_id)
161 .bind(&agg.environment)
162 .bind(agg.request_count)
163 .bind(agg.error_count)
164 .bind(agg.latency_sum)
165 .bind(agg.latency_min)
166 .bind(agg.latency_max)
167 .bind(agg.latency_p50)
168 .bind(agg.latency_p95)
169 .bind(agg.latency_p99)
170 .bind(agg.bytes_sent)
171 .bind(agg.bytes_received)
172 .bind(agg.active_connections)
173 .execute(&self.pool)
174 .await?;
175
176 Ok(result.last_insert_rowid())
177 }
178
179 pub async fn insert_minute_aggregates_batch(
185 &self,
186 aggregates: &[MetricsAggregate],
187 ) -> Result<()> {
188 if aggregates.is_empty() {
189 return Ok(());
190 }
191
192 let mut tx = self.pool.begin().await?;
193
194 for agg in aggregates {
195 sqlx::query(
196 r"
197 INSERT INTO metrics_aggregates_minute (
198 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
199 request_count, error_count, latency_sum, latency_min, latency_max,
200 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
201 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
202 ",
203 )
204 .bind(agg.timestamp)
205 .bind(&agg.protocol)
206 .bind(&agg.method)
207 .bind(&agg.endpoint)
208 .bind(agg.status_code)
209 .bind(&agg.workspace_id)
210 .bind(&agg.environment)
211 .bind(agg.request_count)
212 .bind(agg.error_count)
213 .bind(agg.latency_sum)
214 .bind(agg.latency_min)
215 .bind(agg.latency_max)
216 .bind(agg.latency_p50)
217 .bind(agg.latency_p95)
218 .bind(agg.latency_p99)
219 .bind(agg.bytes_sent)
220 .bind(agg.bytes_received)
221 .bind(agg.active_connections)
222 .execute(&mut *tx)
223 .await?;
224 }
225
226 tx.commit().await?;
227 debug!("Inserted {} minute aggregates", aggregates.len());
228 Ok(())
229 }
230
231 pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
237 let result = sqlx::query(
238 r"
239 INSERT INTO metrics_aggregates_hour (
240 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
241 request_count, error_count, latency_sum, latency_min, latency_max,
242 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
243 active_connections_avg, active_connections_max
244 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
245 ",
246 )
247 .bind(agg.timestamp)
248 .bind(&agg.protocol)
249 .bind(&agg.method)
250 .bind(&agg.endpoint)
251 .bind(agg.status_code)
252 .bind(&agg.workspace_id)
253 .bind(&agg.environment)
254 .bind(agg.request_count)
255 .bind(agg.error_count)
256 .bind(agg.latency_sum)
257 .bind(agg.latency_min)
258 .bind(agg.latency_max)
259 .bind(agg.latency_p50)
260 .bind(agg.latency_p95)
261 .bind(agg.latency_p99)
262 .bind(agg.bytes_sent)
263 .bind(agg.bytes_received)
264 .bind(agg.active_connections_avg)
265 .bind(agg.active_connections_max)
266 .execute(&self.pool)
267 .await?;
268
269 Ok(result.last_insert_rowid())
270 }
271
272 pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
278 let result = sqlx::query(
279 r"
280 INSERT INTO metrics_aggregates_day (
281 date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
282 request_count, error_count, latency_sum, latency_min, latency_max,
283 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
284 active_connections_avg, active_connections_max, unique_clients, peak_hour
285 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
286 ",
287 )
288 .bind(&agg.date)
289 .bind(agg.timestamp)
290 .bind(&agg.protocol)
291 .bind(&agg.method)
292 .bind(&agg.endpoint)
293 .bind(agg.status_code)
294 .bind(&agg.workspace_id)
295 .bind(&agg.environment)
296 .bind(agg.request_count)
297 .bind(agg.error_count)
298 .bind(agg.latency_sum)
299 .bind(agg.latency_min)
300 .bind(agg.latency_max)
301 .bind(agg.latency_p50)
302 .bind(agg.latency_p95)
303 .bind(agg.latency_p99)
304 .bind(agg.bytes_sent)
305 .bind(agg.bytes_received)
306 .bind(agg.active_connections_avg)
307 .bind(agg.active_connections_max)
308 .bind(agg.unique_clients)
309 .bind(agg.peak_hour)
310 .execute(&self.pool)
311 .await?;
312
313 Ok(result.last_insert_rowid())
314 }
315
316 pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
322 sqlx::query(
323 r"
324 INSERT INTO endpoint_stats (
325 endpoint, protocol, method, workspace_id, environment,
326 total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
327 p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
328 first_seen, last_seen
329 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
330 ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
331 DO UPDATE SET
332 total_requests = total_requests + excluded.total_requests,
333 total_errors = total_errors + excluded.total_errors,
334 avg_latency_ms = excluded.avg_latency_ms,
335 min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
336 max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
337 p95_latency_ms = excluded.p95_latency_ms,
338 status_codes = excluded.status_codes,
339 total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
340 total_bytes_received = total_bytes_received + excluded.total_bytes_received,
341 last_seen = excluded.last_seen,
342 updated_at = strftime('%s', 'now')
343 ",
344 )
345 .bind(&stats.endpoint)
346 .bind(&stats.protocol)
347 .bind(&stats.method)
348 .bind(&stats.workspace_id)
349 .bind(&stats.environment)
350 .bind(stats.total_requests)
351 .bind(stats.total_errors)
352 .bind(stats.avg_latency_ms)
353 .bind(stats.min_latency_ms)
354 .bind(stats.max_latency_ms)
355 .bind(stats.p95_latency_ms)
356 .bind(&stats.status_codes)
357 .bind(stats.total_bytes_sent)
358 .bind(stats.total_bytes_received)
359 .bind(stats.first_seen)
360 .bind(stats.last_seen)
361 .execute(&self.pool)
362 .await?;
363
364 Ok(())
365 }
366
367 pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
373 let result = sqlx::query(
374 r"
375 INSERT INTO error_events (
376 timestamp, protocol, method, endpoint, status_code,
377 error_type, error_message, error_category,
378 request_id, trace_id, span_id,
379 client_ip, user_agent, workspace_id, environment, metadata
380 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
381 ",
382 )
383 .bind(error.timestamp)
384 .bind(&error.protocol)
385 .bind(&error.method)
386 .bind(&error.endpoint)
387 .bind(error.status_code)
388 .bind(&error.error_type)
389 .bind(&error.error_message)
390 .bind(&error.error_category)
391 .bind(&error.request_id)
392 .bind(&error.trace_id)
393 .bind(&error.span_id)
394 .bind(&error.client_ip)
395 .bind(&error.user_agent)
396 .bind(&error.workspace_id)
397 .bind(&error.environment)
398 .bind(&error.metadata)
399 .execute(&self.pool)
400 .await?;
401
402 Ok(result.last_insert_rowid())
403 }
404
405 pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
411 sqlx::query(
412 r"
413 INSERT INTO traffic_patterns (
414 date, hour, day_of_week, protocol, workspace_id, environment,
415 request_count, error_count, avg_latency_ms, unique_clients
416 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
417 ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
418 DO UPDATE SET
419 request_count = request_count + excluded.request_count,
420 error_count = error_count + excluded.error_count,
421 avg_latency_ms = excluded.avg_latency_ms,
422 unique_clients = excluded.unique_clients
423 ",
424 )
425 .bind(&pattern.date)
426 .bind(pattern.hour)
427 .bind(pattern.day_of_week)
428 .bind(&pattern.protocol)
429 .bind(&pattern.workspace_id)
430 .bind(&pattern.environment)
431 .bind(pattern.request_count)
432 .bind(pattern.error_count)
433 .bind(pattern.avg_latency_ms)
434 .bind(pattern.unique_clients)
435 .execute(&self.pool)
436 .await?;
437
438 Ok(())
439 }
440
441 pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
447 let result = sqlx::query(
448 r"
449 INSERT INTO analytics_snapshots (
450 timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
451 active_connections, protocol_stats, top_endpoints,
452 memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
453 workspace_id, environment
454 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
455 ",
456 )
457 .bind(snapshot.timestamp)
458 .bind(&snapshot.snapshot_type)
459 .bind(snapshot.total_requests)
460 .bind(snapshot.total_errors)
461 .bind(snapshot.avg_latency_ms)
462 .bind(snapshot.active_connections)
463 .bind(&snapshot.protocol_stats)
464 .bind(&snapshot.top_endpoints)
465 .bind(snapshot.memory_usage_bytes)
466 .bind(snapshot.cpu_usage_percent)
467 .bind(snapshot.thread_count)
468 .bind(snapshot.uptime_seconds)
469 .bind(&snapshot.workspace_id)
470 .bind(&snapshot.environment)
471 .execute(&self.pool)
472 .await?;
473
474 Ok(result.last_insert_rowid())
475 }
476
477 pub async fn get_minute_aggregates(
487 &self,
488 filter: &AnalyticsFilter,
489 ) -> Result<Vec<MetricsAggregate>> {
490 let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
491
492 if filter.start_time.is_some() {
493 query.push_str(" AND timestamp >= ?");
494 }
495 if filter.end_time.is_some() {
496 query.push_str(" AND timestamp <= ?");
497 }
498 if filter.protocol.is_some() {
499 query.push_str(" AND protocol = ?");
500 }
501 if filter.endpoint.is_some() {
502 query.push_str(" AND endpoint = ?");
503 }
504 if filter.method.is_some() {
505 query.push_str(" AND method = ?");
506 }
507 if filter.status_code.is_some() {
508 query.push_str(" AND status_code = ?");
509 }
510 if filter.workspace_id.is_some() {
511 query.push_str(" AND workspace_id = ?");
512 }
513 if filter.environment.is_some() {
514 query.push_str(" AND environment = ?");
515 }
516
517 query.push_str(" ORDER BY timestamp DESC");
518
519 if filter.limit.is_some() {
520 query.push_str(" LIMIT ?");
521 }
522
523 let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
525
526 if let Some(start) = filter.start_time {
527 sql_query = sql_query.bind(start);
528 }
529 if let Some(end) = filter.end_time {
530 sql_query = sql_query.bind(end);
531 }
532 if let Some(ref protocol) = filter.protocol {
533 sql_query = sql_query.bind(protocol);
534 }
535 if let Some(ref endpoint) = filter.endpoint {
536 sql_query = sql_query.bind(endpoint);
537 }
538 if let Some(ref method) = filter.method {
539 sql_query = sql_query.bind(method);
540 }
541 if let Some(status) = filter.status_code {
542 sql_query = sql_query.bind(status);
543 }
544 if let Some(ref workspace) = filter.workspace_id {
545 sql_query = sql_query.bind(workspace);
546 }
547 if let Some(ref env) = filter.environment {
548 sql_query = sql_query.bind(env);
549 }
550 if let Some(limit) = filter.limit {
551 sql_query = sql_query.bind(limit);
552 }
553
554 let results = sql_query.fetch_all(&self.pool).await?;
555
556 Ok(results)
557 }
558
559 pub async fn get_hour_aggregates(
565 &self,
566 filter: &AnalyticsFilter,
567 ) -> Result<Vec<HourMetricsAggregate>> {
568 let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
569
570 if filter.start_time.is_some() {
571 query.push_str(" AND timestamp >= ?");
572 }
573 if filter.end_time.is_some() {
574 query.push_str(" AND timestamp <= ?");
575 }
576 if filter.protocol.is_some() {
577 query.push_str(" AND protocol = ?");
578 }
579 if filter.endpoint.is_some() {
580 query.push_str(" AND endpoint = ?");
581 }
582 if filter.method.is_some() {
583 query.push_str(" AND method = ?");
584 }
585 if filter.status_code.is_some() {
586 query.push_str(" AND status_code = ?");
587 }
588 if filter.workspace_id.is_some() {
589 query.push_str(" AND workspace_id = ?");
590 }
591 if filter.environment.is_some() {
592 query.push_str(" AND environment = ?");
593 }
594
595 query.push_str(" ORDER BY timestamp DESC");
596
597 if filter.limit.is_some() {
598 query.push_str(" LIMIT ?");
599 }
600
601 let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
603
604 if let Some(start) = filter.start_time {
605 sql_query = sql_query.bind(start);
606 }
607 if let Some(end) = filter.end_time {
608 sql_query = sql_query.bind(end);
609 }
610 if let Some(ref protocol) = filter.protocol {
611 sql_query = sql_query.bind(protocol);
612 }
613 if let Some(ref endpoint) = filter.endpoint {
614 sql_query = sql_query.bind(endpoint);
615 }
616 if let Some(ref method) = filter.method {
617 sql_query = sql_query.bind(method);
618 }
619 if let Some(status) = filter.status_code {
620 sql_query = sql_query.bind(status);
621 }
622 if let Some(ref workspace) = filter.workspace_id {
623 sql_query = sql_query.bind(workspace);
624 }
625 if let Some(ref env) = filter.environment {
626 sql_query = sql_query.bind(env);
627 }
628 if let Some(limit) = filter.limit {
629 sql_query = sql_query.bind(limit);
630 }
631
632 let results = sql_query.fetch_all(&self.pool).await?;
633
634 Ok(results)
635 }
636
637 pub async fn get_top_endpoints(
643 &self,
644 limit: i64,
645 workspace_id: Option<&str>,
646 ) -> Result<Vec<EndpointStats>> {
647 let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
648
649 if workspace_id.is_some() {
650 query.push_str(" AND workspace_id = ?");
651 }
652
653 query.push_str(" ORDER BY total_requests DESC LIMIT ?");
654
655 let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
656
657 if let Some(workspace) = workspace_id {
658 sql_query = sql_query.bind(workspace);
659 }
660
661 sql_query = sql_query.bind(limit);
662
663 let results = sql_query.fetch_all(&self.pool).await?;
664
665 Ok(results)
666 }
667
668 pub async fn get_recent_errors(
674 &self,
675 limit: i64,
676 filter: &AnalyticsFilter,
677 ) -> Result<Vec<ErrorEvent>> {
678 let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
679
680 if filter.start_time.is_some() {
681 query.push_str(" AND timestamp >= ?");
682 }
683 if filter.end_time.is_some() {
684 query.push_str(" AND timestamp <= ?");
685 }
686 if filter.endpoint.is_some() {
687 query.push_str(" AND endpoint = ?");
688 }
689 if filter.workspace_id.is_some() {
690 query.push_str(" AND workspace_id = ?");
691 }
692
693 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
694
695 let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
696
697 if let Some(start) = filter.start_time {
698 sql_query = sql_query.bind(start);
699 }
700 if let Some(end) = filter.end_time {
701 sql_query = sql_query.bind(end);
702 }
703 if let Some(ref endpoint) = filter.endpoint {
704 sql_query = sql_query.bind(endpoint);
705 }
706 if let Some(ref workspace) = filter.workspace_id {
707 sql_query = sql_query.bind(workspace);
708 }
709
710 sql_query = sql_query.bind(limit);
711
712 let results = sql_query.fetch_all(&self.pool).await?;
713
714 Ok(results)
715 }
716
717 pub async fn get_traffic_patterns(
723 &self,
724 days: i64,
725 workspace_id: Option<&str>,
726 ) -> Result<Vec<TrafficPattern>> {
727 let start_date = chrono::Utc::now() - chrono::Duration::days(days);
728 let start_date_str = start_date.format("%Y-%m-%d").to_string();
729
730 let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
731
732 if workspace_id.is_some() {
733 query.push_str(" AND workspace_id = ?");
734 }
735
736 query.push_str(" ORDER BY date ASC, hour ASC");
737
738 let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
739
740 if let Some(workspace) = workspace_id {
741 query_builder = query_builder.bind(workspace);
742 }
743
744 let results = query_builder.fetch_all(&self.pool).await?;
745
746 Ok(results)
747 }
748
749 pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
759 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
760
761 let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
762 .bind(cutoff)
763 .execute(&self.pool)
764 .await?;
765
766 info!(
767 "Cleaned up {} minute aggregates older than {} days",
768 result.rows_affected(),
769 days
770 );
771 Ok(result.rows_affected())
772 }
773
774 pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
780 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
781
782 let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
783 .bind(cutoff)
784 .execute(&self.pool)
785 .await?;
786
787 info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
788 Ok(result.rows_affected())
789 }
790
791 pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
797 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
798
799 let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
800 .bind(cutoff)
801 .execute(&self.pool)
802 .await?;
803
804 info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
805 Ok(result.rows_affected())
806 }
807
808 pub async fn vacuum(&self) -> Result<()> {
814 info!("Running VACUUM on analytics database");
815 sqlx::query("VACUUM").execute(&self.pool).await?;
816 info!("VACUUM completed");
817 Ok(())
818 }
819}
820
821impl AnalyticsDatabase {
826 pub async fn record_scenario_usage(
832 &self,
833 scenario_id: &str,
834 workspace_id: Option<&str>,
835 org_id: Option<&str>,
836 ) -> Result<()> {
837 let now = chrono::Utc::now().timestamp();
838
839 let rows_affected = sqlx::query(
842 "UPDATE scenario_usage_metrics
843 SET usage_count = usage_count + 1,
844 last_used_at = ?,
845 updated_at = ?
846 WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
847 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
848 )
849 .bind(now)
850 .bind(now)
851 .bind(scenario_id)
852 .bind(workspace_id)
853 .bind(workspace_id)
854 .bind(org_id)
855 .bind(org_id)
856 .execute(&self.pool)
857 .await?;
858
859 if rows_affected.rows_affected() == 0 {
861 sqlx::query(
862 "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
863 VALUES (?, ?, ?, 1, ?, ?, ?)"
864 )
865 .bind(scenario_id)
866 .bind(workspace_id)
867 .bind(org_id)
868 .bind(now)
869 .bind(now)
870 .bind(now)
871 .execute(&self.pool)
872 .await?;
873 }
874
875 Ok(())
876 }
877
878 pub async fn record_persona_ci_hit(
884 &self,
885 persona_id: &str,
886 workspace_id: Option<&str>,
887 org_id: Option<&str>,
888 ci_run_id: Option<&str>,
889 ) -> Result<()> {
890 let now = chrono::Utc::now().timestamp();
891
892 sqlx::query(
893 "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
894 VALUES (?, ?, ?, ?, 1, ?)"
895 )
896 .bind(persona_id)
897 .bind(workspace_id)
898 .bind(org_id)
899 .bind(ci_run_id)
900 .bind(now)
901 .execute(&self.pool)
902 .await?;
903
904 Ok(())
905 }
906
907 #[allow(clippy::too_many_arguments)]
913 pub async fn record_endpoint_coverage(
914 &self,
915 endpoint: &str,
916 method: Option<&str>,
917 protocol: &str,
918 workspace_id: Option<&str>,
919 org_id: Option<&str>,
920 coverage_percentage: Option<f64>,
921 ) -> Result<()> {
922 let now = chrono::Utc::now().timestamp();
923
924 let rows_affected = sqlx::query(
926 "UPDATE endpoint_coverage
927 SET test_count = test_count + 1,
928 last_tested_at = ?,
929 coverage_percentage = COALESCE(?, coverage_percentage),
930 updated_at = ?
931 WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
932 AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
933 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
934 )
935 .bind(now)
936 .bind(coverage_percentage)
937 .bind(now)
938 .bind(endpoint)
939 .bind(method)
940 .bind(method)
941 .bind(protocol)
942 .bind(workspace_id)
943 .bind(workspace_id)
944 .bind(org_id)
945 .bind(org_id)
946 .execute(&self.pool)
947 .await?;
948
949 if rows_affected.rows_affected() == 0 {
951 sqlx::query(
952 "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
953 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
954 )
955 .bind(endpoint)
956 .bind(method)
957 .bind(protocol)
958 .bind(workspace_id)
959 .bind(org_id)
960 .bind(now)
961 .bind(coverage_percentage)
962 .bind(now)
963 .bind(now)
964 .execute(&self.pool)
965 .await?;
966 }
967
968 Ok(())
969 }
970
971 #[allow(clippy::too_many_arguments)]
977 pub async fn record_reality_level_staleness(
978 &self,
979 workspace_id: &str,
980 org_id: Option<&str>,
981 endpoint: Option<&str>,
982 method: Option<&str>,
983 protocol: Option<&str>,
984 current_reality_level: Option<&str>,
985 staleness_days: Option<i32>,
986 ) -> Result<()> {
987 let now = chrono::Utc::now().timestamp();
988 let last_updated = Some(staleness_days.map_or(now, |days| now - (i64::from(days) * 86400)));
989
990 sqlx::query(
991 "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
992 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
993 ON CONFLICT DO NOTHING"
994 )
995 .bind(workspace_id)
996 .bind(org_id)
997 .bind(endpoint)
998 .bind(method)
999 .bind(protocol)
1000 .bind(current_reality_level)
1001 .bind(last_updated)
1002 .bind(staleness_days)
1003 .bind(now)
1004 .bind(now)
1005 .execute(&self.pool)
1006 .await?;
1007
1008 Ok(())
1009 }
1010
1011 pub async fn record_drift_percentage(
1017 &self,
1018 workspace_id: &str,
1019 org_id: Option<&str>,
1020 total_mocks: i64,
1021 drifting_mocks: i64,
1022 ) -> Result<()> {
1023 let now = chrono::Utc::now().timestamp();
1024 #[allow(clippy::cast_precision_loss)]
1025 let drift_percentage = if total_mocks > 0 {
1026 (drifting_mocks as f64 / total_mocks as f64) * 100.0
1027 } else {
1028 0.0
1029 };
1030
1031 sqlx::query(
1032 "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
1033 VALUES (?, ?, ?, ?, ?, ?)"
1034 )
1035 .bind(workspace_id)
1036 .bind(org_id)
1037 .bind(total_mocks)
1038 .bind(drifting_mocks)
1039 .bind(drift_percentage)
1040 .bind(now)
1041 .execute(&self.pool)
1042 .await?;
1043
1044 Ok(())
1045 }
1046
1047 pub async fn get_scenario_usage(
1053 &self,
1054 workspace_id: Option<&str>,
1055 org_id: Option<&str>,
1056 limit: Option<i64>,
1057 ) -> Result<Vec<ScenarioUsageMetrics>> {
1058 let limit = limit.unwrap_or(100);
1059 let mut query = String::from(
1060 "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
1061 FROM scenario_usage_metrics
1062 WHERE 1=1"
1063 );
1064
1065 if workspace_id.is_some() {
1066 query.push_str(" AND workspace_id = ?");
1067 }
1068 if org_id.is_some() {
1069 query.push_str(" AND org_id = ?");
1070 }
1071 query.push_str(" ORDER BY usage_count DESC LIMIT ?");
1072
1073 let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
1074 if let Some(ws_id) = workspace_id {
1075 q = q.bind(ws_id);
1076 }
1077 if let Some(o_id) = org_id {
1078 q = q.bind(o_id);
1079 }
1080 q = q.bind(limit);
1081
1082 let results = q.fetch_all(&self.pool).await?;
1083 Ok(results)
1084 }
1085
1086 pub async fn get_persona_ci_hits(
1092 &self,
1093 workspace_id: Option<&str>,
1094 org_id: Option<&str>,
1095 limit: Option<i64>,
1096 ) -> Result<Vec<PersonaCIHit>> {
1097 let limit = limit.unwrap_or(100);
1098 let mut query = String::from(
1099 "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1100 FROM persona_ci_hits
1101 WHERE 1=1",
1102 );
1103
1104 if workspace_id.is_some() {
1105 query.push_str(" AND workspace_id = ?");
1106 }
1107 if org_id.is_some() {
1108 query.push_str(" AND org_id = ?");
1109 }
1110 query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1111
1112 let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1113 if let Some(ws_id) = workspace_id {
1114 q = q.bind(ws_id);
1115 }
1116 if let Some(o_id) = org_id {
1117 q = q.bind(o_id);
1118 }
1119 q = q.bind(limit);
1120
1121 let results = q.fetch_all(&self.pool).await?;
1122 Ok(results)
1123 }
1124
1125 pub async fn get_endpoint_coverage(
1131 &self,
1132 workspace_id: Option<&str>,
1133 org_id: Option<&str>,
1134 min_coverage: Option<f64>,
1135 ) -> Result<Vec<EndpointCoverage>> {
1136 let mut query = String::from(
1137 "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1138 FROM endpoint_coverage
1139 WHERE 1=1"
1140 );
1141
1142 if workspace_id.is_some() {
1143 query.push_str(" AND workspace_id = ?");
1144 }
1145 if org_id.is_some() {
1146 query.push_str(" AND org_id = ?");
1147 }
1148 if min_coverage.is_some() {
1149 query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1150 }
1151 query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1152
1153 let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1154 if let Some(ws_id) = workspace_id {
1155 q = q.bind(ws_id);
1156 }
1157 if let Some(o_id) = org_id {
1158 q = q.bind(o_id);
1159 }
1160 if let Some(min_cov) = min_coverage {
1161 q = q.bind(min_cov);
1162 }
1163
1164 let results = q.fetch_all(&self.pool).await?;
1165 Ok(results)
1166 }
1167
1168 pub async fn get_reality_level_staleness(
1174 &self,
1175 workspace_id: Option<&str>,
1176 org_id: Option<&str>,
1177 max_staleness_days: Option<i32>,
1178 ) -> Result<Vec<RealityLevelStaleness>> {
1179 let mut query = String::from(
1180 "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1181 FROM reality_level_staleness
1182 WHERE 1=1"
1183 );
1184
1185 if workspace_id.is_some() {
1186 query.push_str(" AND workspace_id = ?");
1187 }
1188 if org_id.is_some() {
1189 query.push_str(" AND org_id = ?");
1190 }
1191 if max_staleness_days.is_some() {
1192 query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1193 }
1194 query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1195
1196 let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1197 if let Some(ws_id) = workspace_id {
1198 q = q.bind(ws_id);
1199 }
1200 if let Some(o_id) = org_id {
1201 q = q.bind(o_id);
1202 }
1203 if let Some(max_days) = max_staleness_days {
1204 q = q.bind(max_days);
1205 }
1206
1207 let results = q.fetch_all(&self.pool).await?;
1208 Ok(results)
1209 }
1210
1211 pub async fn get_drift_percentage(
1217 &self,
1218 workspace_id: Option<&str>,
1219 org_id: Option<&str>,
1220 limit: Option<i64>,
1221 ) -> Result<Vec<DriftPercentageMetrics>> {
1222 let limit = limit.unwrap_or(100);
1223 let mut query = String::from(
1224 "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1225 FROM drift_percentage_metrics
1226 WHERE 1=1"
1227 );
1228
1229 if workspace_id.is_some() {
1230 query.push_str(" AND workspace_id = ?");
1231 }
1232 if org_id.is_some() {
1233 query.push_str(" AND org_id = ?");
1234 }
1235 query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1236
1237 let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1238 if let Some(ws_id) = workspace_id {
1239 q = q.bind(ws_id);
1240 }
1241 if let Some(o_id) = org_id {
1242 q = q.bind(o_id);
1243 }
1244 q = q.bind(limit);
1245
1246 let results = q.fetch_all(&self.pool).await?;
1247 Ok(results)
1248 }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253 use super::*;
1254
1255 #[tokio::test]
1256 async fn test_database_creation() {
1257 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1258 db.run_migrations().await.unwrap();
1259 }
1260
1261 #[tokio::test]
1262 async fn test_insert_minute_aggregate() {
1263 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1264 db.run_migrations().await.unwrap();
1265
1266 let agg = MetricsAggregate {
1267 id: None,
1268 timestamp: chrono::Utc::now().timestamp(),
1269 protocol: "HTTP".to_string(),
1270 method: Some("GET".to_string()),
1271 endpoint: Some("/api/test".to_string()),
1272 status_code: Some(200),
1273 workspace_id: None,
1274 environment: None,
1275 request_count: 100,
1276 error_count: 5,
1277 latency_sum: 500.0,
1278 latency_min: Some(10.0),
1279 latency_max: Some(100.0),
1280 latency_p50: Some(45.0),
1281 latency_p95: Some(95.0),
1282 latency_p99: Some(99.0),
1283 bytes_sent: 10_000,
1284 bytes_received: 5_000,
1285 active_connections: Some(10),
1286 created_at: None,
1287 };
1288
1289 let id = db.insert_minute_aggregate(&agg).await.unwrap();
1290 assert!(id > 0);
1291 }
1292}