1use crate::error::{AnalyticsError, Result};
4use crate::models::{
5 AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6 EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7 PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14#[derive(Clone)]
16pub struct AnalyticsDatabase {
17 pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21 pub async fn new(database_path: &Path) -> Result<Self> {
31 let db_url = if database_path.to_str() == Some(":memory:") {
32 "sqlite::memory:".to_string()
33 } else {
34 if let Some(parent) = database_path.parent() {
36 if !parent.as_os_str().is_empty() && !parent.exists() {
37 std::fs::create_dir_all(parent).map_err(|e| {
38 error!("Failed to create analytics database directory: {}", e);
39 AnalyticsError::Database(sqlx::Error::Io(e))
40 })?;
41 }
42 }
43 format!("sqlite://{}?mode=rwc", database_path.display())
44 };
45
46 info!("Connecting to analytics database: {}", db_url);
47
48 let pool =
49 SqlitePoolOptions::new()
50 .max_connections(10)
51 .connect(&db_url)
52 .await
53 .map_err(|e| {
54 error!("Failed to connect to analytics database: {}", e);
55 AnalyticsError::Database(e)
56 })?;
57
58 sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
60
61 sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
63
64 Ok(Self { pool })
65 }
66
67 pub async fn run_migrations(&self) -> Result<()> {
73 info!("Running analytics database migrations");
74
75 let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
77 let mut conn = self.pool.acquire().await?;
78 let mut stream = conn.execute_many(migration_sql);
79
80 while stream
81 .try_next()
82 .await
83 .map_err(|e| {
84 error!("Migration error: {}", e);
85 AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
86 })?
87 .is_some()
88 {}
89
90 let coverage_migration_sql = include_str!("../migrations/003_coverage_metrics.sql");
92 let mut conn = self.pool.acquire().await?;
93 let mut stream = conn.execute_many(coverage_migration_sql);
94
95 while stream
96 .try_next()
97 .await
98 .map_err(|e| {
99 error!("Coverage metrics migration error: {}", e);
100 AnalyticsError::Migration(format!(
101 "Failed to execute coverage metrics migration: {e}"
102 ))
103 })?
104 .is_some()
105 {}
106
107 let pillar_usage_migration_sql = include_str!("../migrations/002_pillar_usage.sql");
109 let mut conn = self.pool.acquire().await?;
110 let mut stream = conn.execute_many(pillar_usage_migration_sql);
111
112 while stream
113 .try_next()
114 .await
115 .map_err(|e| {
116 error!("Pillar usage migration error: {}", e);
117 AnalyticsError::Migration(format!("Failed to execute pillar usage migration: {e}"))
118 })?
119 .is_some()
120 {}
121
122 info!("Analytics database migrations completed successfully");
123 Ok(())
124 }
125
126 #[must_use]
128 pub const fn pool(&self) -> &SqlitePool {
129 &self.pool
130 }
131
132 pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
142 let result = sqlx::query(
143 "INSERT INTO metrics_aggregates_minute (
144 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
145 request_count, error_count, latency_sum, latency_min, latency_max,
146 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
147 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
148 )
149 .bind(agg.timestamp)
150 .bind(&agg.protocol)
151 .bind(&agg.method)
152 .bind(&agg.endpoint)
153 .bind(agg.status_code)
154 .bind(&agg.workspace_id)
155 .bind(&agg.environment)
156 .bind(agg.request_count)
157 .bind(agg.error_count)
158 .bind(agg.latency_sum)
159 .bind(agg.latency_min)
160 .bind(agg.latency_max)
161 .bind(agg.latency_p50)
162 .bind(agg.latency_p95)
163 .bind(agg.latency_p99)
164 .bind(agg.bytes_sent)
165 .bind(agg.bytes_received)
166 .bind(agg.active_connections)
167 .execute(&self.pool)
168 .await?;
169
170 Ok(result.last_insert_rowid())
171 }
172
173 pub async fn insert_minute_aggregates_batch(
179 &self,
180 aggregates: &[MetricsAggregate],
181 ) -> Result<()> {
182 if aggregates.is_empty() {
183 return Ok(());
184 }
185
186 let mut tx = self.pool.begin().await?;
187
188 for agg in aggregates {
189 sqlx::query(
190 r"
191 INSERT INTO metrics_aggregates_minute (
192 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
193 request_count, error_count, latency_sum, latency_min, latency_max,
194 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
195 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
196 ",
197 )
198 .bind(agg.timestamp)
199 .bind(&agg.protocol)
200 .bind(&agg.method)
201 .bind(&agg.endpoint)
202 .bind(agg.status_code)
203 .bind(&agg.workspace_id)
204 .bind(&agg.environment)
205 .bind(agg.request_count)
206 .bind(agg.error_count)
207 .bind(agg.latency_sum)
208 .bind(agg.latency_min)
209 .bind(agg.latency_max)
210 .bind(agg.latency_p50)
211 .bind(agg.latency_p95)
212 .bind(agg.latency_p99)
213 .bind(agg.bytes_sent)
214 .bind(agg.bytes_received)
215 .bind(agg.active_connections)
216 .execute(&mut *tx)
217 .await?;
218 }
219
220 tx.commit().await?;
221 debug!("Inserted {} minute aggregates", aggregates.len());
222 Ok(())
223 }
224
225 pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
231 let result = sqlx::query(
232 r"
233 INSERT INTO metrics_aggregates_hour (
234 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
235 request_count, error_count, latency_sum, latency_min, latency_max,
236 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
237 active_connections_avg, active_connections_max
238 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
239 ",
240 )
241 .bind(agg.timestamp)
242 .bind(&agg.protocol)
243 .bind(&agg.method)
244 .bind(&agg.endpoint)
245 .bind(agg.status_code)
246 .bind(&agg.workspace_id)
247 .bind(&agg.environment)
248 .bind(agg.request_count)
249 .bind(agg.error_count)
250 .bind(agg.latency_sum)
251 .bind(agg.latency_min)
252 .bind(agg.latency_max)
253 .bind(agg.latency_p50)
254 .bind(agg.latency_p95)
255 .bind(agg.latency_p99)
256 .bind(agg.bytes_sent)
257 .bind(agg.bytes_received)
258 .bind(agg.active_connections_avg)
259 .bind(agg.active_connections_max)
260 .execute(&self.pool)
261 .await?;
262
263 Ok(result.last_insert_rowid())
264 }
265
266 pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
272 let result = sqlx::query(
273 r"
274 INSERT INTO metrics_aggregates_day (
275 date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
276 request_count, error_count, latency_sum, latency_min, latency_max,
277 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
278 active_connections_avg, active_connections_max, unique_clients, peak_hour
279 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
280 ",
281 )
282 .bind(&agg.date)
283 .bind(agg.timestamp)
284 .bind(&agg.protocol)
285 .bind(&agg.method)
286 .bind(&agg.endpoint)
287 .bind(agg.status_code)
288 .bind(&agg.workspace_id)
289 .bind(&agg.environment)
290 .bind(agg.request_count)
291 .bind(agg.error_count)
292 .bind(agg.latency_sum)
293 .bind(agg.latency_min)
294 .bind(agg.latency_max)
295 .bind(agg.latency_p50)
296 .bind(agg.latency_p95)
297 .bind(agg.latency_p99)
298 .bind(agg.bytes_sent)
299 .bind(agg.bytes_received)
300 .bind(agg.active_connections_avg)
301 .bind(agg.active_connections_max)
302 .bind(agg.unique_clients)
303 .bind(agg.peak_hour)
304 .execute(&self.pool)
305 .await?;
306
307 Ok(result.last_insert_rowid())
308 }
309
310 pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
316 sqlx::query(
317 r"
318 INSERT INTO endpoint_stats (
319 endpoint, protocol, method, workspace_id, environment,
320 total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
321 p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
322 first_seen, last_seen
323 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
324 ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
325 DO UPDATE SET
326 total_requests = total_requests + excluded.total_requests,
327 total_errors = total_errors + excluded.total_errors,
328 avg_latency_ms = excluded.avg_latency_ms,
329 min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
330 max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
331 p95_latency_ms = excluded.p95_latency_ms,
332 status_codes = excluded.status_codes,
333 total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
334 total_bytes_received = total_bytes_received + excluded.total_bytes_received,
335 last_seen = excluded.last_seen,
336 updated_at = strftime('%s', 'now')
337 ",
338 )
339 .bind(&stats.endpoint)
340 .bind(&stats.protocol)
341 .bind(&stats.method)
342 .bind(&stats.workspace_id)
343 .bind(&stats.environment)
344 .bind(stats.total_requests)
345 .bind(stats.total_errors)
346 .bind(stats.avg_latency_ms)
347 .bind(stats.min_latency_ms)
348 .bind(stats.max_latency_ms)
349 .bind(stats.p95_latency_ms)
350 .bind(&stats.status_codes)
351 .bind(stats.total_bytes_sent)
352 .bind(stats.total_bytes_received)
353 .bind(stats.first_seen)
354 .bind(stats.last_seen)
355 .execute(&self.pool)
356 .await?;
357
358 Ok(())
359 }
360
361 pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
367 let result = sqlx::query(
368 r"
369 INSERT INTO error_events (
370 timestamp, protocol, method, endpoint, status_code,
371 error_type, error_message, error_category,
372 request_id, trace_id, span_id,
373 client_ip, user_agent, workspace_id, environment, metadata
374 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
375 ",
376 )
377 .bind(error.timestamp)
378 .bind(&error.protocol)
379 .bind(&error.method)
380 .bind(&error.endpoint)
381 .bind(error.status_code)
382 .bind(&error.error_type)
383 .bind(&error.error_message)
384 .bind(&error.error_category)
385 .bind(&error.request_id)
386 .bind(&error.trace_id)
387 .bind(&error.span_id)
388 .bind(&error.client_ip)
389 .bind(&error.user_agent)
390 .bind(&error.workspace_id)
391 .bind(&error.environment)
392 .bind(&error.metadata)
393 .execute(&self.pool)
394 .await?;
395
396 Ok(result.last_insert_rowid())
397 }
398
399 pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
405 sqlx::query(
406 r"
407 INSERT INTO traffic_patterns (
408 date, hour, day_of_week, protocol, workspace_id, environment,
409 request_count, error_count, avg_latency_ms, unique_clients
410 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
411 ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
412 DO UPDATE SET
413 request_count = request_count + excluded.request_count,
414 error_count = error_count + excluded.error_count,
415 avg_latency_ms = excluded.avg_latency_ms,
416 unique_clients = excluded.unique_clients
417 ",
418 )
419 .bind(&pattern.date)
420 .bind(pattern.hour)
421 .bind(pattern.day_of_week)
422 .bind(&pattern.protocol)
423 .bind(&pattern.workspace_id)
424 .bind(&pattern.environment)
425 .bind(pattern.request_count)
426 .bind(pattern.error_count)
427 .bind(pattern.avg_latency_ms)
428 .bind(pattern.unique_clients)
429 .execute(&self.pool)
430 .await?;
431
432 Ok(())
433 }
434
435 pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
441 let result = sqlx::query(
442 r"
443 INSERT INTO analytics_snapshots (
444 timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
445 active_connections, protocol_stats, top_endpoints,
446 memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
447 workspace_id, environment
448 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
449 ",
450 )
451 .bind(snapshot.timestamp)
452 .bind(&snapshot.snapshot_type)
453 .bind(snapshot.total_requests)
454 .bind(snapshot.total_errors)
455 .bind(snapshot.avg_latency_ms)
456 .bind(snapshot.active_connections)
457 .bind(&snapshot.protocol_stats)
458 .bind(&snapshot.top_endpoints)
459 .bind(snapshot.memory_usage_bytes)
460 .bind(snapshot.cpu_usage_percent)
461 .bind(snapshot.thread_count)
462 .bind(snapshot.uptime_seconds)
463 .bind(&snapshot.workspace_id)
464 .bind(&snapshot.environment)
465 .execute(&self.pool)
466 .await?;
467
468 Ok(result.last_insert_rowid())
469 }
470
471 pub async fn get_minute_aggregates(
481 &self,
482 filter: &AnalyticsFilter,
483 ) -> Result<Vec<MetricsAggregate>> {
484 let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
485
486 if filter.start_time.is_some() {
487 query.push_str(" AND timestamp >= ?");
488 }
489 if filter.end_time.is_some() {
490 query.push_str(" AND timestamp <= ?");
491 }
492 if filter.protocol.is_some() {
493 query.push_str(" AND protocol = ?");
494 }
495 if filter.endpoint.is_some() {
496 query.push_str(" AND endpoint = ?");
497 }
498 if filter.method.is_some() {
499 query.push_str(" AND method = ?");
500 }
501 if filter.status_code.is_some() {
502 query.push_str(" AND status_code = ?");
503 }
504 if filter.workspace_id.is_some() {
505 query.push_str(" AND workspace_id = ?");
506 }
507 if filter.environment.is_some() {
508 query.push_str(" AND environment = ?");
509 }
510
511 query.push_str(" ORDER BY timestamp DESC");
512
513 if filter.limit.is_some() {
514 query.push_str(" LIMIT ?");
515 }
516
517 let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
519
520 if let Some(start) = filter.start_time {
521 sql_query = sql_query.bind(start);
522 }
523 if let Some(end) = filter.end_time {
524 sql_query = sql_query.bind(end);
525 }
526 if let Some(ref protocol) = filter.protocol {
527 sql_query = sql_query.bind(protocol);
528 }
529 if let Some(ref endpoint) = filter.endpoint {
530 sql_query = sql_query.bind(endpoint);
531 }
532 if let Some(ref method) = filter.method {
533 sql_query = sql_query.bind(method);
534 }
535 if let Some(status) = filter.status_code {
536 sql_query = sql_query.bind(status);
537 }
538 if let Some(ref workspace) = filter.workspace_id {
539 sql_query = sql_query.bind(workspace);
540 }
541 if let Some(ref env) = filter.environment {
542 sql_query = sql_query.bind(env);
543 }
544 if let Some(limit) = filter.limit {
545 sql_query = sql_query.bind(limit);
546 }
547
548 let results = sql_query.fetch_all(&self.pool).await?;
549
550 Ok(results)
551 }
552
553 pub async fn get_hour_aggregates(
559 &self,
560 filter: &AnalyticsFilter,
561 ) -> Result<Vec<HourMetricsAggregate>> {
562 let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
563
564 if filter.start_time.is_some() {
565 query.push_str(" AND timestamp >= ?");
566 }
567 if filter.end_time.is_some() {
568 query.push_str(" AND timestamp <= ?");
569 }
570 if filter.protocol.is_some() {
571 query.push_str(" AND protocol = ?");
572 }
573 if filter.endpoint.is_some() {
574 query.push_str(" AND endpoint = ?");
575 }
576 if filter.method.is_some() {
577 query.push_str(" AND method = ?");
578 }
579 if filter.status_code.is_some() {
580 query.push_str(" AND status_code = ?");
581 }
582 if filter.workspace_id.is_some() {
583 query.push_str(" AND workspace_id = ?");
584 }
585 if filter.environment.is_some() {
586 query.push_str(" AND environment = ?");
587 }
588
589 query.push_str(" ORDER BY timestamp DESC");
590
591 if filter.limit.is_some() {
592 query.push_str(" LIMIT ?");
593 }
594
595 let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
597
598 if let Some(start) = filter.start_time {
599 sql_query = sql_query.bind(start);
600 }
601 if let Some(end) = filter.end_time {
602 sql_query = sql_query.bind(end);
603 }
604 if let Some(ref protocol) = filter.protocol {
605 sql_query = sql_query.bind(protocol);
606 }
607 if let Some(ref endpoint) = filter.endpoint {
608 sql_query = sql_query.bind(endpoint);
609 }
610 if let Some(ref method) = filter.method {
611 sql_query = sql_query.bind(method);
612 }
613 if let Some(status) = filter.status_code {
614 sql_query = sql_query.bind(status);
615 }
616 if let Some(ref workspace) = filter.workspace_id {
617 sql_query = sql_query.bind(workspace);
618 }
619 if let Some(ref env) = filter.environment {
620 sql_query = sql_query.bind(env);
621 }
622 if let Some(limit) = filter.limit {
623 sql_query = sql_query.bind(limit);
624 }
625
626 let results = sql_query.fetch_all(&self.pool).await?;
627
628 Ok(results)
629 }
630
631 pub async fn get_top_endpoints(
637 &self,
638 limit: i64,
639 workspace_id: Option<&str>,
640 ) -> Result<Vec<EndpointStats>> {
641 let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
642
643 if workspace_id.is_some() {
644 query.push_str(" AND workspace_id = ?");
645 }
646
647 query.push_str(" ORDER BY total_requests DESC LIMIT ?");
648
649 let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
650
651 if let Some(workspace) = workspace_id {
652 sql_query = sql_query.bind(workspace);
653 }
654
655 sql_query = sql_query.bind(limit);
656
657 let results = sql_query.fetch_all(&self.pool).await?;
658
659 Ok(results)
660 }
661
662 pub async fn get_recent_errors(
668 &self,
669 limit: i64,
670 filter: &AnalyticsFilter,
671 ) -> Result<Vec<ErrorEvent>> {
672 let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
673
674 if filter.start_time.is_some() {
675 query.push_str(" AND timestamp >= ?");
676 }
677 if filter.end_time.is_some() {
678 query.push_str(" AND timestamp <= ?");
679 }
680 if filter.endpoint.is_some() {
681 query.push_str(" AND endpoint = ?");
682 }
683 if filter.workspace_id.is_some() {
684 query.push_str(" AND workspace_id = ?");
685 }
686
687 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
688
689 let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
690
691 if let Some(start) = filter.start_time {
692 sql_query = sql_query.bind(start);
693 }
694 if let Some(end) = filter.end_time {
695 sql_query = sql_query.bind(end);
696 }
697 if let Some(ref endpoint) = filter.endpoint {
698 sql_query = sql_query.bind(endpoint);
699 }
700 if let Some(ref workspace) = filter.workspace_id {
701 sql_query = sql_query.bind(workspace);
702 }
703
704 sql_query = sql_query.bind(limit);
705
706 let results = sql_query.fetch_all(&self.pool).await?;
707
708 Ok(results)
709 }
710
711 pub async fn get_traffic_patterns(
717 &self,
718 days: i64,
719 workspace_id: Option<&str>,
720 ) -> Result<Vec<TrafficPattern>> {
721 let start_date = chrono::Utc::now() - chrono::Duration::days(days);
722 let start_date_str = start_date.format("%Y-%m-%d").to_string();
723
724 let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
725
726 if workspace_id.is_some() {
727 query.push_str(" AND workspace_id = ?");
728 }
729
730 query.push_str(" ORDER BY date ASC, hour ASC");
731
732 let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
733
734 if let Some(workspace) = workspace_id {
735 query_builder = query_builder.bind(workspace);
736 }
737
738 let results = query_builder.fetch_all(&self.pool).await?;
739
740 Ok(results)
741 }
742
743 pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
753 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
754
755 let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
756 .bind(cutoff)
757 .execute(&self.pool)
758 .await?;
759
760 info!(
761 "Cleaned up {} minute aggregates older than {} days",
762 result.rows_affected(),
763 days
764 );
765 Ok(result.rows_affected())
766 }
767
768 pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
774 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
775
776 let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
777 .bind(cutoff)
778 .execute(&self.pool)
779 .await?;
780
781 info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
782 Ok(result.rows_affected())
783 }
784
785 pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
791 let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
792
793 let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
794 .bind(cutoff)
795 .execute(&self.pool)
796 .await?;
797
798 info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
799 Ok(result.rows_affected())
800 }
801
802 pub async fn vacuum(&self) -> Result<()> {
808 info!("Running VACUUM on analytics database");
809 sqlx::query("VACUUM").execute(&self.pool).await?;
810 info!("VACUUM completed");
811 Ok(())
812 }
813}
814
815impl AnalyticsDatabase {
820 pub async fn record_scenario_usage(
826 &self,
827 scenario_id: &str,
828 workspace_id: Option<&str>,
829 org_id: Option<&str>,
830 ) -> Result<()> {
831 let now = chrono::Utc::now().timestamp();
832
833 let rows_affected = sqlx::query(
836 "UPDATE scenario_usage_metrics
837 SET usage_count = usage_count + 1,
838 last_used_at = ?,
839 updated_at = ?
840 WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
841 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
842 )
843 .bind(now)
844 .bind(now)
845 .bind(scenario_id)
846 .bind(workspace_id)
847 .bind(workspace_id)
848 .bind(org_id)
849 .bind(org_id)
850 .execute(&self.pool)
851 .await?;
852
853 if rows_affected.rows_affected() == 0 {
855 sqlx::query(
856 "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
857 VALUES (?, ?, ?, 1, ?, ?, ?)"
858 )
859 .bind(scenario_id)
860 .bind(workspace_id)
861 .bind(org_id)
862 .bind(now)
863 .bind(now)
864 .bind(now)
865 .execute(&self.pool)
866 .await?;
867 }
868
869 Ok(())
870 }
871
872 pub async fn record_persona_ci_hit(
878 &self,
879 persona_id: &str,
880 workspace_id: Option<&str>,
881 org_id: Option<&str>,
882 ci_run_id: Option<&str>,
883 ) -> Result<()> {
884 let now = chrono::Utc::now().timestamp();
885
886 sqlx::query(
887 "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
888 VALUES (?, ?, ?, ?, 1, ?)"
889 )
890 .bind(persona_id)
891 .bind(workspace_id)
892 .bind(org_id)
893 .bind(ci_run_id)
894 .bind(now)
895 .execute(&self.pool)
896 .await?;
897
898 Ok(())
899 }
900
901 #[allow(clippy::too_many_arguments)]
907 pub async fn record_endpoint_coverage(
908 &self,
909 endpoint: &str,
910 method: Option<&str>,
911 protocol: &str,
912 workspace_id: Option<&str>,
913 org_id: Option<&str>,
914 coverage_percentage: Option<f64>,
915 ) -> Result<()> {
916 let now = chrono::Utc::now().timestamp();
917
918 let rows_affected = sqlx::query(
920 "UPDATE endpoint_coverage
921 SET test_count = test_count + 1,
922 last_tested_at = ?,
923 coverage_percentage = COALESCE(?, coverage_percentage),
924 updated_at = ?
925 WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
926 AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
927 AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
928 )
929 .bind(now)
930 .bind(coverage_percentage)
931 .bind(now)
932 .bind(endpoint)
933 .bind(method)
934 .bind(method)
935 .bind(protocol)
936 .bind(workspace_id)
937 .bind(workspace_id)
938 .bind(org_id)
939 .bind(org_id)
940 .execute(&self.pool)
941 .await?;
942
943 if rows_affected.rows_affected() == 0 {
945 sqlx::query(
946 "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
947 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
948 )
949 .bind(endpoint)
950 .bind(method)
951 .bind(protocol)
952 .bind(workspace_id)
953 .bind(org_id)
954 .bind(now)
955 .bind(coverage_percentage)
956 .bind(now)
957 .bind(now)
958 .execute(&self.pool)
959 .await?;
960 }
961
962 Ok(())
963 }
964
965 #[allow(clippy::too_many_arguments)]
971 pub async fn record_reality_level_staleness(
972 &self,
973 workspace_id: &str,
974 org_id: Option<&str>,
975 endpoint: Option<&str>,
976 method: Option<&str>,
977 protocol: Option<&str>,
978 current_reality_level: Option<&str>,
979 staleness_days: Option<i32>,
980 ) -> Result<()> {
981 let now = chrono::Utc::now().timestamp();
982 let last_updated = Some(staleness_days.map_or(now, |days| now - (i64::from(days) * 86400)));
983
984 sqlx::query(
985 "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
986 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
987 ON CONFLICT DO NOTHING"
988 )
989 .bind(workspace_id)
990 .bind(org_id)
991 .bind(endpoint)
992 .bind(method)
993 .bind(protocol)
994 .bind(current_reality_level)
995 .bind(last_updated)
996 .bind(staleness_days)
997 .bind(now)
998 .bind(now)
999 .execute(&self.pool)
1000 .await?;
1001
1002 Ok(())
1003 }
1004
1005 pub async fn record_drift_percentage(
1011 &self,
1012 workspace_id: &str,
1013 org_id: Option<&str>,
1014 total_mocks: i64,
1015 drifting_mocks: i64,
1016 ) -> Result<()> {
1017 let now = chrono::Utc::now().timestamp();
1018 #[allow(clippy::cast_precision_loss)]
1019 let drift_percentage = if total_mocks > 0 {
1020 (drifting_mocks as f64 / total_mocks as f64) * 100.0
1021 } else {
1022 0.0
1023 };
1024
1025 sqlx::query(
1026 "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
1027 VALUES (?, ?, ?, ?, ?, ?)"
1028 )
1029 .bind(workspace_id)
1030 .bind(org_id)
1031 .bind(total_mocks)
1032 .bind(drifting_mocks)
1033 .bind(drift_percentage)
1034 .bind(now)
1035 .execute(&self.pool)
1036 .await?;
1037
1038 Ok(())
1039 }
1040
1041 pub async fn get_scenario_usage(
1047 &self,
1048 workspace_id: Option<&str>,
1049 org_id: Option<&str>,
1050 limit: Option<i64>,
1051 ) -> Result<Vec<ScenarioUsageMetrics>> {
1052 let limit = limit.unwrap_or(100);
1053 let mut query = String::from(
1054 "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
1055 FROM scenario_usage_metrics
1056 WHERE 1=1"
1057 );
1058
1059 if workspace_id.is_some() {
1060 query.push_str(" AND workspace_id = ?");
1061 }
1062 if org_id.is_some() {
1063 query.push_str(" AND org_id = ?");
1064 }
1065 query.push_str(" ORDER BY usage_count DESC LIMIT ?");
1066
1067 let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
1068 if let Some(ws_id) = workspace_id {
1069 q = q.bind(ws_id);
1070 }
1071 if let Some(o_id) = org_id {
1072 q = q.bind(o_id);
1073 }
1074 q = q.bind(limit);
1075
1076 let results = q.fetch_all(&self.pool).await?;
1077 Ok(results)
1078 }
1079
1080 pub async fn get_persona_ci_hits(
1086 &self,
1087 workspace_id: Option<&str>,
1088 org_id: Option<&str>,
1089 limit: Option<i64>,
1090 ) -> Result<Vec<PersonaCIHit>> {
1091 let limit = limit.unwrap_or(100);
1092 let mut query = String::from(
1093 "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1094 FROM persona_ci_hits
1095 WHERE 1=1",
1096 );
1097
1098 if workspace_id.is_some() {
1099 query.push_str(" AND workspace_id = ?");
1100 }
1101 if org_id.is_some() {
1102 query.push_str(" AND org_id = ?");
1103 }
1104 query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1105
1106 let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1107 if let Some(ws_id) = workspace_id {
1108 q = q.bind(ws_id);
1109 }
1110 if let Some(o_id) = org_id {
1111 q = q.bind(o_id);
1112 }
1113 q = q.bind(limit);
1114
1115 let results = q.fetch_all(&self.pool).await?;
1116 Ok(results)
1117 }
1118
1119 pub async fn get_endpoint_coverage(
1125 &self,
1126 workspace_id: Option<&str>,
1127 org_id: Option<&str>,
1128 min_coverage: Option<f64>,
1129 ) -> Result<Vec<EndpointCoverage>> {
1130 let mut query = String::from(
1131 "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1132 FROM endpoint_coverage
1133 WHERE 1=1"
1134 );
1135
1136 if workspace_id.is_some() {
1137 query.push_str(" AND workspace_id = ?");
1138 }
1139 if org_id.is_some() {
1140 query.push_str(" AND org_id = ?");
1141 }
1142 if min_coverage.is_some() {
1143 query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1144 }
1145 query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1146
1147 let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1148 if let Some(ws_id) = workspace_id {
1149 q = q.bind(ws_id);
1150 }
1151 if let Some(o_id) = org_id {
1152 q = q.bind(o_id);
1153 }
1154 if let Some(min_cov) = min_coverage {
1155 q = q.bind(min_cov);
1156 }
1157
1158 let results = q.fetch_all(&self.pool).await?;
1159 Ok(results)
1160 }
1161
1162 pub async fn get_reality_level_staleness(
1168 &self,
1169 workspace_id: Option<&str>,
1170 org_id: Option<&str>,
1171 max_staleness_days: Option<i32>,
1172 ) -> Result<Vec<RealityLevelStaleness>> {
1173 let mut query = String::from(
1174 "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1175 FROM reality_level_staleness
1176 WHERE 1=1"
1177 );
1178
1179 if workspace_id.is_some() {
1180 query.push_str(" AND workspace_id = ?");
1181 }
1182 if org_id.is_some() {
1183 query.push_str(" AND org_id = ?");
1184 }
1185 if max_staleness_days.is_some() {
1186 query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1187 }
1188 query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1189
1190 let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1191 if let Some(ws_id) = workspace_id {
1192 q = q.bind(ws_id);
1193 }
1194 if let Some(o_id) = org_id {
1195 q = q.bind(o_id);
1196 }
1197 if let Some(max_days) = max_staleness_days {
1198 q = q.bind(max_days);
1199 }
1200
1201 let results = q.fetch_all(&self.pool).await?;
1202 Ok(results)
1203 }
1204
1205 pub async fn get_drift_percentage(
1211 &self,
1212 workspace_id: Option<&str>,
1213 org_id: Option<&str>,
1214 limit: Option<i64>,
1215 ) -> Result<Vec<DriftPercentageMetrics>> {
1216 let limit = limit.unwrap_or(100);
1217 let mut query = String::from(
1218 "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1219 FROM drift_percentage_metrics
1220 WHERE 1=1"
1221 );
1222
1223 if workspace_id.is_some() {
1224 query.push_str(" AND workspace_id = ?");
1225 }
1226 if org_id.is_some() {
1227 query.push_str(" AND org_id = ?");
1228 }
1229 query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1230
1231 let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1232 if let Some(ws_id) = workspace_id {
1233 q = q.bind(ws_id);
1234 }
1235 if let Some(o_id) = org_id {
1236 q = q.bind(o_id);
1237 }
1238 q = q.bind(limit);
1239
1240 let results = q.fetch_all(&self.pool).await?;
1241 Ok(results)
1242 }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247 use super::*;
1248
1249 #[tokio::test]
1250 async fn test_database_creation() {
1251 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1252 db.run_migrations().await.unwrap();
1253 }
1254
1255 #[tokio::test]
1256 async fn test_insert_minute_aggregate() {
1257 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1258 db.run_migrations().await.unwrap();
1259
1260 let agg = MetricsAggregate {
1261 id: None,
1262 timestamp: chrono::Utc::now().timestamp(),
1263 protocol: "HTTP".to_string(),
1264 method: Some("GET".to_string()),
1265 endpoint: Some("/api/test".to_string()),
1266 status_code: Some(200),
1267 workspace_id: None,
1268 environment: None,
1269 request_count: 100,
1270 error_count: 5,
1271 latency_sum: 500.0,
1272 latency_min: Some(10.0),
1273 latency_max: Some(100.0),
1274 latency_p50: Some(45.0),
1275 latency_p95: Some(95.0),
1276 latency_p99: Some(99.0),
1277 bytes_sent: 10_000,
1278 bytes_received: 5_000,
1279 active_connections: Some(10),
1280 created_at: None,
1281 };
1282
1283 let id = db.insert_minute_aggregate(&agg).await.unwrap();
1284 assert!(id > 0);
1285 }
1286}