mockforge_analytics/
database.rs

1//! Database layer for analytics storage
2
3use crate::error::{AnalyticsError, Result};
4use crate::models::{
5    AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6    EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7    PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14/// Analytics database manager
15#[derive(Clone)]
16pub struct AnalyticsDatabase {
17    pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21    /// Create a new analytics database connection
22    ///
23    /// # Arguments
24    /// * `database_path` - Path to the `SQLite` database file (or ":memory:" for in-memory)
25    pub async fn new(database_path: &Path) -> Result<Self> {
26        let db_url = if database_path.to_str() == Some(":memory:") {
27            "sqlite::memory:".to_string()
28        } else {
29            format!("sqlite://{}", database_path.display())
30        };
31
32        info!("Connecting to analytics database: {}", db_url);
33
34        let pool =
35            SqlitePoolOptions::new()
36                .max_connections(10)
37                .connect(&db_url)
38                .await
39                .map_err(|e| {
40                    error!("Failed to connect to analytics database: {}", e);
41                    AnalyticsError::Database(e)
42                })?;
43
44        // Enable WAL mode for better concurrency
45        sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
46
47        // Enable foreign keys
48        sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
49
50        Ok(Self { pool })
51    }
52
53    /// Run database migrations
54    pub async fn run_migrations(&self) -> Result<()> {
55        info!("Running analytics database migrations");
56
57        // Run initial schema migration
58        let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
59        let mut conn = self.pool.acquire().await?;
60        let mut stream = conn.execute_many(migration_sql);
61
62        while let Some(_) = stream.try_next().await.map_err(|e| {
63            error!("Migration error: {}", e);
64            AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
65        })? {}
66
67        // Run coverage metrics migration
68        let coverage_migration_sql = include_str!("../migrations/002_coverage_metrics.sql");
69        let mut conn = self.pool.acquire().await?;
70        let mut stream = conn.execute_many(coverage_migration_sql);
71
72        while let Some(_) = stream.try_next().await.map_err(|e| {
73            error!("Coverage metrics migration error: {}", e);
74            AnalyticsError::Migration(format!("Failed to execute coverage metrics migration: {e}"))
75        })? {}
76
77        // Run pillar usage migration
78        let pillar_usage_migration_sql = include_str!("../migrations/002_pillar_usage.sql");
79        let mut conn = self.pool.acquire().await?;
80        let mut stream = conn.execute_many(pillar_usage_migration_sql);
81
82        while let Some(_) = stream.try_next().await.map_err(|e| {
83            error!("Pillar usage migration error: {}", e);
84            AnalyticsError::Migration(format!("Failed to execute pillar usage migration: {e}"))
85        })? {}
86
87        info!("Analytics database migrations completed successfully");
88        Ok(())
89    }
90
91    /// Get a reference to the database pool
92    #[must_use]
93    pub const fn pool(&self) -> &SqlitePool {
94        &self.pool
95    }
96
97    // ========================================================================
98    // Insert Operations
99    // ========================================================================
100
101    /// Insert a minute-level metrics aggregate
102    pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
103        let result = sqlx::query(
104            "INSERT INTO metrics_aggregates_minute (
105                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
106                request_count, error_count, latency_sum, latency_min, latency_max,
107                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
108            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
109        )
110        .bind(agg.timestamp)
111        .bind(&agg.protocol)
112        .bind(&agg.method)
113        .bind(&agg.endpoint)
114        .bind(agg.status_code)
115        .bind(&agg.workspace_id)
116        .bind(&agg.environment)
117        .bind(agg.request_count)
118        .bind(agg.error_count)
119        .bind(agg.latency_sum)
120        .bind(agg.latency_min)
121        .bind(agg.latency_max)
122        .bind(agg.latency_p50)
123        .bind(agg.latency_p95)
124        .bind(agg.latency_p99)
125        .bind(agg.bytes_sent)
126        .bind(agg.bytes_received)
127        .bind(agg.active_connections)
128        .execute(&self.pool)
129        .await?;
130
131        Ok(result.last_insert_rowid())
132    }
133
134    /// Insert multiple minute-level aggregates in a batch
135    pub async fn insert_minute_aggregates_batch(
136        &self,
137        aggregates: &[MetricsAggregate],
138    ) -> Result<()> {
139        if aggregates.is_empty() {
140            return Ok(());
141        }
142
143        let mut tx = self.pool.begin().await?;
144
145        for agg in aggregates {
146            sqlx::query(
147                r"
148                INSERT INTO metrics_aggregates_minute (
149                    timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
150                    request_count, error_count, latency_sum, latency_min, latency_max,
151                    latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
152                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
153                ",
154            )
155            .bind(agg.timestamp)
156            .bind(&agg.protocol)
157            .bind(&agg.method)
158            .bind(&agg.endpoint)
159            .bind(agg.status_code)
160            .bind(&agg.workspace_id)
161            .bind(&agg.environment)
162            .bind(agg.request_count)
163            .bind(agg.error_count)
164            .bind(agg.latency_sum)
165            .bind(agg.latency_min)
166            .bind(agg.latency_max)
167            .bind(agg.latency_p50)
168            .bind(agg.latency_p95)
169            .bind(agg.latency_p99)
170            .bind(agg.bytes_sent)
171            .bind(agg.bytes_received)
172            .bind(agg.active_connections)
173            .execute(&mut *tx)
174            .await?;
175        }
176
177        tx.commit().await?;
178        debug!("Inserted {} minute aggregates", aggregates.len());
179        Ok(())
180    }
181
182    /// Insert an hour-level metrics aggregate
183    pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
184        let result = sqlx::query(
185            r"
186            INSERT INTO metrics_aggregates_hour (
187                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
188                request_count, error_count, latency_sum, latency_min, latency_max,
189                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
190                active_connections_avg, active_connections_max
191            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
192            ",
193        )
194        .bind(agg.timestamp)
195        .bind(&agg.protocol)
196        .bind(&agg.method)
197        .bind(&agg.endpoint)
198        .bind(agg.status_code)
199        .bind(&agg.workspace_id)
200        .bind(&agg.environment)
201        .bind(agg.request_count)
202        .bind(agg.error_count)
203        .bind(agg.latency_sum)
204        .bind(agg.latency_min)
205        .bind(agg.latency_max)
206        .bind(agg.latency_p50)
207        .bind(agg.latency_p95)
208        .bind(agg.latency_p99)
209        .bind(agg.bytes_sent)
210        .bind(agg.bytes_received)
211        .bind(agg.active_connections_avg)
212        .bind(agg.active_connections_max)
213        .execute(&self.pool)
214        .await?;
215
216        Ok(result.last_insert_rowid())
217    }
218
219    /// Insert a day-level metrics aggregate
220    pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
221        let result = sqlx::query(
222            r"
223            INSERT INTO metrics_aggregates_day (
224                date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
225                request_count, error_count, latency_sum, latency_min, latency_max,
226                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
227                active_connections_avg, active_connections_max, unique_clients, peak_hour
228            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
229            ",
230        )
231        .bind(&agg.date)
232        .bind(agg.timestamp)
233        .bind(&agg.protocol)
234        .bind(&agg.method)
235        .bind(&agg.endpoint)
236        .bind(agg.status_code)
237        .bind(&agg.workspace_id)
238        .bind(&agg.environment)
239        .bind(agg.request_count)
240        .bind(agg.error_count)
241        .bind(agg.latency_sum)
242        .bind(agg.latency_min)
243        .bind(agg.latency_max)
244        .bind(agg.latency_p50)
245        .bind(agg.latency_p95)
246        .bind(agg.latency_p99)
247        .bind(agg.bytes_sent)
248        .bind(agg.bytes_received)
249        .bind(agg.active_connections_avg)
250        .bind(agg.active_connections_max)
251        .bind(agg.unique_clients)
252        .bind(agg.peak_hour)
253        .execute(&self.pool)
254        .await?;
255
256        Ok(result.last_insert_rowid())
257    }
258
259    /// Insert or update endpoint statistics
260    pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
261        sqlx::query(
262            r"
263            INSERT INTO endpoint_stats (
264                endpoint, protocol, method, workspace_id, environment,
265                total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
266                p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
267                first_seen, last_seen
268            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
269            ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
270            DO UPDATE SET
271                total_requests = total_requests + excluded.total_requests,
272                total_errors = total_errors + excluded.total_errors,
273                avg_latency_ms = excluded.avg_latency_ms,
274                min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
275                max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
276                p95_latency_ms = excluded.p95_latency_ms,
277                status_codes = excluded.status_codes,
278                total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
279                total_bytes_received = total_bytes_received + excluded.total_bytes_received,
280                last_seen = excluded.last_seen,
281                updated_at = strftime('%s', 'now')
282            ",
283        )
284        .bind(&stats.endpoint)
285        .bind(&stats.protocol)
286        .bind(&stats.method)
287        .bind(&stats.workspace_id)
288        .bind(&stats.environment)
289        .bind(stats.total_requests)
290        .bind(stats.total_errors)
291        .bind(stats.avg_latency_ms)
292        .bind(stats.min_latency_ms)
293        .bind(stats.max_latency_ms)
294        .bind(stats.p95_latency_ms)
295        .bind(&stats.status_codes)
296        .bind(stats.total_bytes_sent)
297        .bind(stats.total_bytes_received)
298        .bind(stats.first_seen)
299        .bind(stats.last_seen)
300        .execute(&self.pool)
301        .await?;
302
303        Ok(())
304    }
305
306    /// Insert an error event
307    pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
308        let result = sqlx::query(
309            r"
310            INSERT INTO error_events (
311                timestamp, protocol, method, endpoint, status_code,
312                error_type, error_message, error_category,
313                request_id, trace_id, span_id,
314                client_ip, user_agent, workspace_id, environment, metadata
315            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
316            ",
317        )
318        .bind(error.timestamp)
319        .bind(&error.protocol)
320        .bind(&error.method)
321        .bind(&error.endpoint)
322        .bind(error.status_code)
323        .bind(&error.error_type)
324        .bind(&error.error_message)
325        .bind(&error.error_category)
326        .bind(&error.request_id)
327        .bind(&error.trace_id)
328        .bind(&error.span_id)
329        .bind(&error.client_ip)
330        .bind(&error.user_agent)
331        .bind(&error.workspace_id)
332        .bind(&error.environment)
333        .bind(&error.metadata)
334        .execute(&self.pool)
335        .await?;
336
337        Ok(result.last_insert_rowid())
338    }
339
340    /// Insert a traffic pattern
341    pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
342        sqlx::query(
343            r"
344            INSERT INTO traffic_patterns (
345                date, hour, day_of_week, protocol, workspace_id, environment,
346                request_count, error_count, avg_latency_ms, unique_clients
347            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
348            ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
349            DO UPDATE SET
350                request_count = request_count + excluded.request_count,
351                error_count = error_count + excluded.error_count,
352                avg_latency_ms = excluded.avg_latency_ms,
353                unique_clients = excluded.unique_clients
354            ",
355        )
356        .bind(&pattern.date)
357        .bind(pattern.hour)
358        .bind(pattern.day_of_week)
359        .bind(&pattern.protocol)
360        .bind(&pattern.workspace_id)
361        .bind(&pattern.environment)
362        .bind(pattern.request_count)
363        .bind(pattern.error_count)
364        .bind(pattern.avg_latency_ms)
365        .bind(pattern.unique_clients)
366        .execute(&self.pool)
367        .await?;
368
369        Ok(())
370    }
371
372    /// Insert an analytics snapshot
373    pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
374        let result = sqlx::query(
375            r"
376            INSERT INTO analytics_snapshots (
377                timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
378                active_connections, protocol_stats, top_endpoints,
379                memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
380                workspace_id, environment
381            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
382            ",
383        )
384        .bind(snapshot.timestamp)
385        .bind(&snapshot.snapshot_type)
386        .bind(snapshot.total_requests)
387        .bind(snapshot.total_errors)
388        .bind(snapshot.avg_latency_ms)
389        .bind(snapshot.active_connections)
390        .bind(&snapshot.protocol_stats)
391        .bind(&snapshot.top_endpoints)
392        .bind(snapshot.memory_usage_bytes)
393        .bind(snapshot.cpu_usage_percent)
394        .bind(snapshot.thread_count)
395        .bind(snapshot.uptime_seconds)
396        .bind(&snapshot.workspace_id)
397        .bind(&snapshot.environment)
398        .execute(&self.pool)
399        .await?;
400
401        Ok(result.last_insert_rowid())
402    }
403
404    // ========================================================================
405    // Query Operations
406    // ========================================================================
407
408    /// Get minute aggregates for a time range
409    pub async fn get_minute_aggregates(
410        &self,
411        filter: &AnalyticsFilter,
412    ) -> Result<Vec<MetricsAggregate>> {
413        let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
414
415        if filter.start_time.is_some() {
416            query.push_str(" AND timestamp >= ?");
417        }
418        if filter.end_time.is_some() {
419            query.push_str(" AND timestamp <= ?");
420        }
421        if filter.protocol.is_some() {
422            query.push_str(" AND protocol = ?");
423        }
424        if filter.endpoint.is_some() {
425            query.push_str(" AND endpoint = ?");
426        }
427        if filter.method.is_some() {
428            query.push_str(" AND method = ?");
429        }
430        if filter.status_code.is_some() {
431            query.push_str(" AND status_code = ?");
432        }
433        if filter.workspace_id.is_some() {
434            query.push_str(" AND workspace_id = ?");
435        }
436        if filter.environment.is_some() {
437            query.push_str(" AND environment = ?");
438        }
439
440        query.push_str(" ORDER BY timestamp DESC");
441
442        if filter.limit.is_some() {
443            query.push_str(" LIMIT ?");
444        }
445
446        // Build the query with bound parameters
447        let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
448
449        if let Some(start) = filter.start_time {
450            sql_query = sql_query.bind(start);
451        }
452        if let Some(end) = filter.end_time {
453            sql_query = sql_query.bind(end);
454        }
455        if let Some(ref protocol) = filter.protocol {
456            sql_query = sql_query.bind(protocol);
457        }
458        if let Some(ref endpoint) = filter.endpoint {
459            sql_query = sql_query.bind(endpoint);
460        }
461        if let Some(ref method) = filter.method {
462            sql_query = sql_query.bind(method);
463        }
464        if let Some(status) = filter.status_code {
465            sql_query = sql_query.bind(status);
466        }
467        if let Some(ref workspace) = filter.workspace_id {
468            sql_query = sql_query.bind(workspace);
469        }
470        if let Some(ref env) = filter.environment {
471            sql_query = sql_query.bind(env);
472        }
473        if let Some(limit) = filter.limit {
474            sql_query = sql_query.bind(limit);
475        }
476
477        let results = sql_query.fetch_all(&self.pool).await?;
478
479        Ok(results)
480    }
481
482    /// Get hour-level aggregates
483    pub async fn get_hour_aggregates(
484        &self,
485        filter: &AnalyticsFilter,
486    ) -> Result<Vec<HourMetricsAggregate>> {
487        let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
488
489        if filter.start_time.is_some() {
490            query.push_str(" AND timestamp >= ?");
491        }
492        if filter.end_time.is_some() {
493            query.push_str(" AND timestamp <= ?");
494        }
495        if filter.protocol.is_some() {
496            query.push_str(" AND protocol = ?");
497        }
498        if filter.endpoint.is_some() {
499            query.push_str(" AND endpoint = ?");
500        }
501        if filter.method.is_some() {
502            query.push_str(" AND method = ?");
503        }
504        if filter.status_code.is_some() {
505            query.push_str(" AND status_code = ?");
506        }
507        if filter.workspace_id.is_some() {
508            query.push_str(" AND workspace_id = ?");
509        }
510        if filter.environment.is_some() {
511            query.push_str(" AND environment = ?");
512        }
513
514        query.push_str(" ORDER BY timestamp DESC");
515
516        if filter.limit.is_some() {
517            query.push_str(" LIMIT ?");
518        }
519
520        // Build the query with bound parameters
521        let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
522
523        if let Some(start) = filter.start_time {
524            sql_query = sql_query.bind(start);
525        }
526        if let Some(end) = filter.end_time {
527            sql_query = sql_query.bind(end);
528        }
529        if let Some(ref protocol) = filter.protocol {
530            sql_query = sql_query.bind(protocol);
531        }
532        if let Some(ref endpoint) = filter.endpoint {
533            sql_query = sql_query.bind(endpoint);
534        }
535        if let Some(ref method) = filter.method {
536            sql_query = sql_query.bind(method);
537        }
538        if let Some(status) = filter.status_code {
539            sql_query = sql_query.bind(status);
540        }
541        if let Some(ref workspace) = filter.workspace_id {
542            sql_query = sql_query.bind(workspace);
543        }
544        if let Some(ref env) = filter.environment {
545            sql_query = sql_query.bind(env);
546        }
547        if let Some(limit) = filter.limit {
548            sql_query = sql_query.bind(limit);
549        }
550
551        let results = sql_query.fetch_all(&self.pool).await?;
552
553        Ok(results)
554    }
555
556    /// Get top endpoints by request count
557    pub async fn get_top_endpoints(
558        &self,
559        limit: i64,
560        workspace_id: Option<&str>,
561    ) -> Result<Vec<EndpointStats>> {
562        let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
563
564        if workspace_id.is_some() {
565            query.push_str(" AND workspace_id = ?");
566        }
567
568        query.push_str(" ORDER BY total_requests DESC LIMIT ?");
569
570        let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
571
572        if let Some(workspace) = workspace_id {
573            sql_query = sql_query.bind(workspace);
574        }
575
576        sql_query = sql_query.bind(limit);
577
578        let results = sql_query.fetch_all(&self.pool).await?;
579
580        Ok(results)
581    }
582
583    /// Get recent error events
584    pub async fn get_recent_errors(
585        &self,
586        limit: i64,
587        filter: &AnalyticsFilter,
588    ) -> Result<Vec<ErrorEvent>> {
589        let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
590
591        if filter.start_time.is_some() {
592            query.push_str(" AND timestamp >= ?");
593        }
594        if filter.end_time.is_some() {
595            query.push_str(" AND timestamp <= ?");
596        }
597        if filter.endpoint.is_some() {
598            query.push_str(" AND endpoint = ?");
599        }
600        if filter.workspace_id.is_some() {
601            query.push_str(" AND workspace_id = ?");
602        }
603
604        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
605
606        let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
607
608        if let Some(start) = filter.start_time {
609            sql_query = sql_query.bind(start);
610        }
611        if let Some(end) = filter.end_time {
612            sql_query = sql_query.bind(end);
613        }
614        if let Some(ref endpoint) = filter.endpoint {
615            sql_query = sql_query.bind(endpoint);
616        }
617        if let Some(ref workspace) = filter.workspace_id {
618            sql_query = sql_query.bind(workspace);
619        }
620
621        sql_query = sql_query.bind(limit);
622
623        let results = sql_query.fetch_all(&self.pool).await?;
624
625        Ok(results)
626    }
627
628    /// Get traffic patterns for heatmap
629    pub async fn get_traffic_patterns(
630        &self,
631        days: i64,
632        workspace_id: Option<&str>,
633    ) -> Result<Vec<TrafficPattern>> {
634        let start_date = chrono::Utc::now() - chrono::Duration::days(days);
635        let start_date_str = start_date.format("%Y-%m-%d").to_string();
636
637        let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
638
639        if let Some(_workspace) = workspace_id {
640            query.push_str(" AND workspace_id = ?");
641        }
642
643        query.push_str(" ORDER BY date ASC, hour ASC");
644
645        let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
646
647        if let Some(workspace) = workspace_id {
648            query_builder = query_builder.bind(workspace);
649        }
650
651        let results = query_builder.fetch_all(&self.pool).await?;
652
653        Ok(results)
654    }
655
656    // ========================================================================
657    // Cleanup Operations
658    // ========================================================================
659
660    /// Delete old minute aggregates
661    pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
662        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
663
664        let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
665            .bind(cutoff)
666            .execute(&self.pool)
667            .await?;
668
669        info!(
670            "Cleaned up {} minute aggregates older than {} days",
671            result.rows_affected(),
672            days
673        );
674        Ok(result.rows_affected())
675    }
676
677    /// Delete old hour aggregates
678    pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
679        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
680
681        let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
682            .bind(cutoff)
683            .execute(&self.pool)
684            .await?;
685
686        info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
687        Ok(result.rows_affected())
688    }
689
690    /// Delete old error events
691    pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
692        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
693
694        let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
695            .bind(cutoff)
696            .execute(&self.pool)
697            .await?;
698
699        info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
700        Ok(result.rows_affected())
701    }
702
703    /// Vacuum the database to reclaim space
704    pub async fn vacuum(&self) -> Result<()> {
705        info!("Running VACUUM on analytics database");
706        sqlx::query("VACUUM").execute(&self.pool).await?;
707        info!("VACUUM completed");
708        Ok(())
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use super::*;
715
716    #[tokio::test]
717    async fn test_database_creation() {
718        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
719        db.run_migrations().await.unwrap();
720    }
721
722    #[tokio::test]
723    async fn test_insert_minute_aggregate() {
724        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
725        db.run_migrations().await.unwrap();
726
727        let agg = MetricsAggregate {
728            id: None,
729            timestamp: chrono::Utc::now().timestamp(),
730            protocol: "HTTP".to_string(),
731            method: Some("GET".to_string()),
732            endpoint: Some("/api/test".to_string()),
733            status_code: Some(200),
734            workspace_id: None,
735            environment: None,
736            request_count: 100,
737            error_count: 5,
738            latency_sum: 500.0,
739            latency_min: Some(10.0),
740            latency_max: Some(100.0),
741            latency_p50: Some(45.0),
742            latency_p95: Some(95.0),
743            latency_p99: Some(99.0),
744            bytes_sent: 10000,
745            bytes_received: 5000,
746            active_connections: Some(10),
747            created_at: None,
748        };
749
750        let id = db.insert_minute_aggregate(&agg).await.unwrap();
751        assert!(id > 0);
752    }
753}
754
755// ============================================================================
756// Coverage Metrics Operations (MockOps)
757// ============================================================================
758
759impl AnalyticsDatabase {
760    /// Record scenario usage
761    pub async fn record_scenario_usage(
762        &self,
763        scenario_id: &str,
764        workspace_id: Option<&str>,
765        org_id: Option<&str>,
766    ) -> Result<()> {
767        let now = chrono::Utc::now().timestamp();
768
769        // SQLite doesn't support ON CONFLICT with multiple columns easily, so use INSERT OR REPLACE
770        // First try to update existing record
771        let rows_affected = sqlx::query(
772            "UPDATE scenario_usage_metrics
773             SET usage_count = usage_count + 1,
774                 last_used_at = ?,
775                 updated_at = ?
776             WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
777               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
778        )
779        .bind(now)
780        .bind(now)
781        .bind(scenario_id)
782        .bind(workspace_id)
783        .bind(workspace_id)
784        .bind(org_id)
785        .bind(org_id)
786        .execute(&self.pool)
787        .await?;
788
789        // If no rows were updated, insert a new record
790        if rows_affected.rows_affected() == 0 {
791            sqlx::query(
792                "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
793                 VALUES (?, ?, ?, 1, ?, ?, ?)"
794            )
795            .bind(scenario_id)
796            .bind(workspace_id)
797            .bind(org_id)
798            .bind(now)
799            .bind(now)
800            .bind(now)
801            .execute(&self.pool)
802            .await?;
803        }
804
805        Ok(())
806    }
807
808    /// Record persona CI hit
809    pub async fn record_persona_ci_hit(
810        &self,
811        persona_id: &str,
812        workspace_id: Option<&str>,
813        org_id: Option<&str>,
814        ci_run_id: Option<&str>,
815    ) -> Result<()> {
816        let now = chrono::Utc::now().timestamp();
817
818        sqlx::query(
819            "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
820             VALUES (?, ?, ?, ?, 1, ?)"
821        )
822        .bind(persona_id)
823        .bind(workspace_id)
824        .bind(org_id)
825        .bind(ci_run_id)
826        .bind(now)
827        .execute(&self.pool)
828        .await?;
829
830        Ok(())
831    }
832
833    /// Record endpoint test coverage
834    pub async fn record_endpoint_coverage(
835        &self,
836        endpoint: &str,
837        method: Option<&str>,
838        protocol: &str,
839        workspace_id: Option<&str>,
840        org_id: Option<&str>,
841        coverage_percentage: Option<f64>,
842    ) -> Result<()> {
843        let now = chrono::Utc::now().timestamp();
844
845        // Try to update existing record
846        let rows_affected = sqlx::query(
847            "UPDATE endpoint_coverage
848             SET test_count = test_count + 1,
849                 last_tested_at = ?,
850                 coverage_percentage = COALESCE(?, coverage_percentage),
851                 updated_at = ?
852             WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
853               AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
854               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
855        )
856        .bind(now)
857        .bind(coverage_percentage)
858        .bind(now)
859        .bind(endpoint)
860        .bind(method)
861        .bind(method)
862        .bind(protocol)
863        .bind(workspace_id)
864        .bind(workspace_id)
865        .bind(org_id)
866        .bind(org_id)
867        .execute(&self.pool)
868        .await?;
869
870        // If no rows were updated, insert a new record
871        if rows_affected.rows_affected() == 0 {
872            sqlx::query(
873                "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
874                 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
875            )
876            .bind(endpoint)
877            .bind(method)
878            .bind(protocol)
879            .bind(workspace_id)
880            .bind(org_id)
881            .bind(now)
882            .bind(coverage_percentage)
883            .bind(now)
884            .bind(now)
885            .execute(&self.pool)
886            .await?;
887        }
888
889        Ok(())
890    }
891
892    /// Record reality level staleness
893    pub async fn record_reality_level_staleness(
894        &self,
895        workspace_id: &str,
896        org_id: Option<&str>,
897        endpoint: Option<&str>,
898        method: Option<&str>,
899        protocol: Option<&str>,
900        current_reality_level: Option<&str>,
901        staleness_days: Option<i32>,
902    ) -> Result<()> {
903        let now = chrono::Utc::now().timestamp();
904        let last_updated = if let Some(days) = staleness_days {
905            Some(now - (i64::from(days) * 86400))
906        } else {
907            Some(now)
908        };
909
910        sqlx::query(
911            "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
912             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
913             ON CONFLICT DO NOTHING"
914        )
915        .bind(workspace_id)
916        .bind(org_id)
917        .bind(endpoint)
918        .bind(method)
919        .bind(protocol)
920        .bind(current_reality_level)
921        .bind(last_updated)
922        .bind(staleness_days)
923        .bind(now)
924        .bind(now)
925        .execute(&self.pool)
926        .await?;
927
928        Ok(())
929    }
930
931    /// Record drift percentage metrics
932    pub async fn record_drift_percentage(
933        &self,
934        workspace_id: &str,
935        org_id: Option<&str>,
936        total_mocks: i64,
937        drifting_mocks: i64,
938    ) -> Result<()> {
939        let now = chrono::Utc::now().timestamp();
940        let drift_percentage = if total_mocks > 0 {
941            (drifting_mocks as f64 / total_mocks as f64) * 100.0
942        } else {
943            0.0
944        };
945
946        sqlx::query(
947            "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
948             VALUES (?, ?, ?, ?, ?, ?)"
949        )
950        .bind(workspace_id)
951        .bind(org_id)
952        .bind(total_mocks)
953        .bind(drifting_mocks)
954        .bind(drift_percentage)
955        .bind(now)
956        .execute(&self.pool)
957        .await?;
958
959        Ok(())
960    }
961
962    /// Get scenario usage metrics
963    pub async fn get_scenario_usage(
964        &self,
965        workspace_id: Option<&str>,
966        org_id: Option<&str>,
967        limit: Option<i64>,
968    ) -> Result<Vec<ScenarioUsageMetrics>> {
969        let limit = limit.unwrap_or(100);
970        let mut query = String::from(
971            "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
972             FROM scenario_usage_metrics
973             WHERE 1=1"
974        );
975
976        if workspace_id.is_some() {
977            query.push_str(" AND workspace_id = ?");
978        }
979        if org_id.is_some() {
980            query.push_str(" AND org_id = ?");
981        }
982        query.push_str(" ORDER BY usage_count DESC LIMIT ?");
983
984        let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
985        if let Some(ws_id) = workspace_id {
986            q = q.bind(ws_id);
987        }
988        if let Some(o_id) = org_id {
989            q = q.bind(o_id);
990        }
991        q = q.bind(limit);
992
993        let results = q.fetch_all(&self.pool).await?;
994        Ok(results)
995    }
996
997    /// Get persona CI hits
998    pub async fn get_persona_ci_hits(
999        &self,
1000        workspace_id: Option<&str>,
1001        org_id: Option<&str>,
1002        limit: Option<i64>,
1003    ) -> Result<Vec<PersonaCIHit>> {
1004        let limit = limit.unwrap_or(100);
1005        let mut query = String::from(
1006            "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1007             FROM persona_ci_hits
1008             WHERE 1=1",
1009        );
1010
1011        if workspace_id.is_some() {
1012            query.push_str(" AND workspace_id = ?");
1013        }
1014        if org_id.is_some() {
1015            query.push_str(" AND org_id = ?");
1016        }
1017        query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1018
1019        let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1020        if let Some(ws_id) = workspace_id {
1021            q = q.bind(ws_id);
1022        }
1023        if let Some(o_id) = org_id {
1024            q = q.bind(o_id);
1025        }
1026        q = q.bind(limit);
1027
1028        let results = q.fetch_all(&self.pool).await?;
1029        Ok(results)
1030    }
1031
1032    /// Get endpoint coverage
1033    pub async fn get_endpoint_coverage(
1034        &self,
1035        workspace_id: Option<&str>,
1036        org_id: Option<&str>,
1037        min_coverage: Option<f64>,
1038    ) -> Result<Vec<EndpointCoverage>> {
1039        let mut query = String::from(
1040            "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1041             FROM endpoint_coverage
1042             WHERE 1=1"
1043        );
1044
1045        if workspace_id.is_some() {
1046            query.push_str(" AND workspace_id = ?");
1047        }
1048        if org_id.is_some() {
1049            query.push_str(" AND org_id = ?");
1050        }
1051        if min_coverage.is_some() {
1052            query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1053        }
1054        query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1055
1056        let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1057        if let Some(ws_id) = workspace_id {
1058            q = q.bind(ws_id);
1059        }
1060        if let Some(o_id) = org_id {
1061            q = q.bind(o_id);
1062        }
1063        if let Some(min_cov) = min_coverage {
1064            q = q.bind(min_cov);
1065        }
1066
1067        let results = q.fetch_all(&self.pool).await?;
1068        Ok(results)
1069    }
1070
1071    /// Get reality level staleness
1072    pub async fn get_reality_level_staleness(
1073        &self,
1074        workspace_id: Option<&str>,
1075        org_id: Option<&str>,
1076        max_staleness_days: Option<i32>,
1077    ) -> Result<Vec<RealityLevelStaleness>> {
1078        let mut query = String::from(
1079            "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1080             FROM reality_level_staleness
1081             WHERE 1=1"
1082        );
1083
1084        if workspace_id.is_some() {
1085            query.push_str(" AND workspace_id = ?");
1086        }
1087        if org_id.is_some() {
1088            query.push_str(" AND org_id = ?");
1089        }
1090        if max_staleness_days.is_some() {
1091            query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1092        }
1093        query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1094
1095        let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1096        if let Some(ws_id) = workspace_id {
1097            q = q.bind(ws_id);
1098        }
1099        if let Some(o_id) = org_id {
1100            q = q.bind(o_id);
1101        }
1102        if let Some(max_days) = max_staleness_days {
1103            q = q.bind(max_days);
1104        }
1105
1106        let results = q.fetch_all(&self.pool).await?;
1107        Ok(results)
1108    }
1109
1110    /// Get drift percentage metrics
1111    pub async fn get_drift_percentage(
1112        &self,
1113        workspace_id: Option<&str>,
1114        org_id: Option<&str>,
1115        limit: Option<i64>,
1116    ) -> Result<Vec<DriftPercentageMetrics>> {
1117        let limit = limit.unwrap_or(100);
1118        let mut query = String::from(
1119            "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1120             FROM drift_percentage_metrics
1121             WHERE 1=1"
1122        );
1123
1124        if workspace_id.is_some() {
1125            query.push_str(" AND workspace_id = ?");
1126        }
1127        if org_id.is_some() {
1128            query.push_str(" AND org_id = ?");
1129        }
1130        query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1131
1132        let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1133        if let Some(ws_id) = workspace_id {
1134            q = q.bind(ws_id);
1135        }
1136        if let Some(o_id) = org_id {
1137            q = q.bind(o_id);
1138        }
1139        q = q.bind(limit);
1140
1141        let results = q.fetch_all(&self.pool).await?;
1142        Ok(results)
1143    }
1144}