mockforge_analytics/
database.rs

1//! Database layer for analytics storage
2
3use crate::error::{AnalyticsError, Result};
4use crate::models::{
5    AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6    EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7    PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14/// Analytics database manager
15#[derive(Clone)]
16pub struct AnalyticsDatabase {
17    pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21    /// Create a new analytics database connection
22    ///
23    /// # Arguments
24    /// * `database_path` - Path to the `SQLite` database file (or ":memory:" for in-memory)
25    pub async fn new(database_path: &Path) -> Result<Self> {
26        let db_url = if database_path.to_str() == Some(":memory:") {
27            "sqlite::memory:".to_string()
28        } else {
29            format!("sqlite://{}", database_path.display())
30        };
31
32        info!("Connecting to analytics database: {}", db_url);
33
34        let pool =
35            SqlitePoolOptions::new()
36                .max_connections(10)
37                .connect(&db_url)
38                .await
39                .map_err(|e| {
40                    error!("Failed to connect to analytics database: {}", e);
41                    AnalyticsError::Database(e)
42                })?;
43
44        // Enable WAL mode for better concurrency
45        sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
46
47        // Enable foreign keys
48        sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
49
50        Ok(Self { pool })
51    }
52
53    /// Run database migrations
54    pub async fn run_migrations(&self) -> Result<()> {
55        info!("Running analytics database migrations");
56
57        // Run initial schema migration
58        let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
59        let mut conn = self.pool.acquire().await?;
60        let mut stream = conn.execute_many(migration_sql);
61
62        while let Some(_) = stream.try_next().await.map_err(|e| {
63            error!("Migration error: {}", e);
64            AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
65        })? {}
66
67        // Run coverage metrics migration
68        let coverage_migration_sql = include_str!("../migrations/002_coverage_metrics.sql");
69        let mut conn = self.pool.acquire().await?;
70        let mut stream = conn.execute_many(coverage_migration_sql);
71
72        while let Some(_) = stream.try_next().await.map_err(|e| {
73            error!("Coverage metrics migration error: {}", e);
74            AnalyticsError::Migration(format!("Failed to execute coverage metrics migration: {e}"))
75        })? {}
76
77        info!("Analytics database migrations completed successfully");
78        Ok(())
79    }
80
81    /// Get a reference to the database pool
82    #[must_use]
83    pub const fn pool(&self) -> &SqlitePool {
84        &self.pool
85    }
86
87    // ========================================================================
88    // Insert Operations
89    // ========================================================================
90
91    /// Insert a minute-level metrics aggregate
92    pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
93        let result = sqlx::query(
94            "INSERT INTO metrics_aggregates_minute (
95                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
96                request_count, error_count, latency_sum, latency_min, latency_max,
97                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
98            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
99        )
100        .bind(agg.timestamp)
101        .bind(&agg.protocol)
102        .bind(&agg.method)
103        .bind(&agg.endpoint)
104        .bind(agg.status_code)
105        .bind(&agg.workspace_id)
106        .bind(&agg.environment)
107        .bind(agg.request_count)
108        .bind(agg.error_count)
109        .bind(agg.latency_sum)
110        .bind(agg.latency_min)
111        .bind(agg.latency_max)
112        .bind(agg.latency_p50)
113        .bind(agg.latency_p95)
114        .bind(agg.latency_p99)
115        .bind(agg.bytes_sent)
116        .bind(agg.bytes_received)
117        .bind(agg.active_connections)
118        .execute(&self.pool)
119        .await?;
120
121        Ok(result.last_insert_rowid())
122    }
123
124    /// Insert multiple minute-level aggregates in a batch
125    pub async fn insert_minute_aggregates_batch(
126        &self,
127        aggregates: &[MetricsAggregate],
128    ) -> Result<()> {
129        if aggregates.is_empty() {
130            return Ok(());
131        }
132
133        let mut tx = self.pool.begin().await?;
134
135        for agg in aggregates {
136            sqlx::query(
137                r"
138                INSERT INTO metrics_aggregates_minute (
139                    timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
140                    request_count, error_count, latency_sum, latency_min, latency_max,
141                    latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
142                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
143                ",
144            )
145            .bind(agg.timestamp)
146            .bind(&agg.protocol)
147            .bind(&agg.method)
148            .bind(&agg.endpoint)
149            .bind(agg.status_code)
150            .bind(&agg.workspace_id)
151            .bind(&agg.environment)
152            .bind(agg.request_count)
153            .bind(agg.error_count)
154            .bind(agg.latency_sum)
155            .bind(agg.latency_min)
156            .bind(agg.latency_max)
157            .bind(agg.latency_p50)
158            .bind(agg.latency_p95)
159            .bind(agg.latency_p99)
160            .bind(agg.bytes_sent)
161            .bind(agg.bytes_received)
162            .bind(agg.active_connections)
163            .execute(&mut *tx)
164            .await?;
165        }
166
167        tx.commit().await?;
168        debug!("Inserted {} minute aggregates", aggregates.len());
169        Ok(())
170    }
171
172    /// Insert an hour-level metrics aggregate
173    pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
174        let result = sqlx::query(
175            r"
176            INSERT INTO metrics_aggregates_hour (
177                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
178                request_count, error_count, latency_sum, latency_min, latency_max,
179                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
180                active_connections_avg, active_connections_max
181            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
182            ",
183        )
184        .bind(agg.timestamp)
185        .bind(&agg.protocol)
186        .bind(&agg.method)
187        .bind(&agg.endpoint)
188        .bind(agg.status_code)
189        .bind(&agg.workspace_id)
190        .bind(&agg.environment)
191        .bind(agg.request_count)
192        .bind(agg.error_count)
193        .bind(agg.latency_sum)
194        .bind(agg.latency_min)
195        .bind(agg.latency_max)
196        .bind(agg.latency_p50)
197        .bind(agg.latency_p95)
198        .bind(agg.latency_p99)
199        .bind(agg.bytes_sent)
200        .bind(agg.bytes_received)
201        .bind(agg.active_connections_avg)
202        .bind(agg.active_connections_max)
203        .execute(&self.pool)
204        .await?;
205
206        Ok(result.last_insert_rowid())
207    }
208
209    /// Insert a day-level metrics aggregate
210    pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
211        let result = sqlx::query(
212            r"
213            INSERT INTO metrics_aggregates_day (
214                date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
215                request_count, error_count, latency_sum, latency_min, latency_max,
216                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
217                active_connections_avg, active_connections_max, unique_clients, peak_hour
218            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
219            ",
220        )
221        .bind(&agg.date)
222        .bind(agg.timestamp)
223        .bind(&agg.protocol)
224        .bind(&agg.method)
225        .bind(&agg.endpoint)
226        .bind(agg.status_code)
227        .bind(&agg.workspace_id)
228        .bind(&agg.environment)
229        .bind(agg.request_count)
230        .bind(agg.error_count)
231        .bind(agg.latency_sum)
232        .bind(agg.latency_min)
233        .bind(agg.latency_max)
234        .bind(agg.latency_p50)
235        .bind(agg.latency_p95)
236        .bind(agg.latency_p99)
237        .bind(agg.bytes_sent)
238        .bind(agg.bytes_received)
239        .bind(agg.active_connections_avg)
240        .bind(agg.active_connections_max)
241        .bind(agg.unique_clients)
242        .bind(agg.peak_hour)
243        .execute(&self.pool)
244        .await?;
245
246        Ok(result.last_insert_rowid())
247    }
248
249    /// Insert or update endpoint statistics
250    pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
251        sqlx::query(
252            r"
253            INSERT INTO endpoint_stats (
254                endpoint, protocol, method, workspace_id, environment,
255                total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
256                p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
257                first_seen, last_seen
258            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
259            ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
260            DO UPDATE SET
261                total_requests = total_requests + excluded.total_requests,
262                total_errors = total_errors + excluded.total_errors,
263                avg_latency_ms = excluded.avg_latency_ms,
264                min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
265                max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
266                p95_latency_ms = excluded.p95_latency_ms,
267                status_codes = excluded.status_codes,
268                total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
269                total_bytes_received = total_bytes_received + excluded.total_bytes_received,
270                last_seen = excluded.last_seen,
271                updated_at = strftime('%s', 'now')
272            ",
273        )
274        .bind(&stats.endpoint)
275        .bind(&stats.protocol)
276        .bind(&stats.method)
277        .bind(&stats.workspace_id)
278        .bind(&stats.environment)
279        .bind(stats.total_requests)
280        .bind(stats.total_errors)
281        .bind(stats.avg_latency_ms)
282        .bind(stats.min_latency_ms)
283        .bind(stats.max_latency_ms)
284        .bind(stats.p95_latency_ms)
285        .bind(&stats.status_codes)
286        .bind(stats.total_bytes_sent)
287        .bind(stats.total_bytes_received)
288        .bind(stats.first_seen)
289        .bind(stats.last_seen)
290        .execute(&self.pool)
291        .await?;
292
293        Ok(())
294    }
295
296    /// Insert an error event
297    pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
298        let result = sqlx::query(
299            r"
300            INSERT INTO error_events (
301                timestamp, protocol, method, endpoint, status_code,
302                error_type, error_message, error_category,
303                request_id, trace_id, span_id,
304                client_ip, user_agent, workspace_id, environment, metadata
305            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
306            ",
307        )
308        .bind(error.timestamp)
309        .bind(&error.protocol)
310        .bind(&error.method)
311        .bind(&error.endpoint)
312        .bind(error.status_code)
313        .bind(&error.error_type)
314        .bind(&error.error_message)
315        .bind(&error.error_category)
316        .bind(&error.request_id)
317        .bind(&error.trace_id)
318        .bind(&error.span_id)
319        .bind(&error.client_ip)
320        .bind(&error.user_agent)
321        .bind(&error.workspace_id)
322        .bind(&error.environment)
323        .bind(&error.metadata)
324        .execute(&self.pool)
325        .await?;
326
327        Ok(result.last_insert_rowid())
328    }
329
330    /// Insert a traffic pattern
331    pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
332        sqlx::query(
333            r"
334            INSERT INTO traffic_patterns (
335                date, hour, day_of_week, protocol, workspace_id, environment,
336                request_count, error_count, avg_latency_ms, unique_clients
337            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
338            ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
339            DO UPDATE SET
340                request_count = request_count + excluded.request_count,
341                error_count = error_count + excluded.error_count,
342                avg_latency_ms = excluded.avg_latency_ms,
343                unique_clients = excluded.unique_clients
344            ",
345        )
346        .bind(&pattern.date)
347        .bind(pattern.hour)
348        .bind(pattern.day_of_week)
349        .bind(&pattern.protocol)
350        .bind(&pattern.workspace_id)
351        .bind(&pattern.environment)
352        .bind(pattern.request_count)
353        .bind(pattern.error_count)
354        .bind(pattern.avg_latency_ms)
355        .bind(pattern.unique_clients)
356        .execute(&self.pool)
357        .await?;
358
359        Ok(())
360    }
361
362    /// Insert an analytics snapshot
363    pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
364        let result = sqlx::query(
365            r"
366            INSERT INTO analytics_snapshots (
367                timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
368                active_connections, protocol_stats, top_endpoints,
369                memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
370                workspace_id, environment
371            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
372            ",
373        )
374        .bind(snapshot.timestamp)
375        .bind(&snapshot.snapshot_type)
376        .bind(snapshot.total_requests)
377        .bind(snapshot.total_errors)
378        .bind(snapshot.avg_latency_ms)
379        .bind(snapshot.active_connections)
380        .bind(&snapshot.protocol_stats)
381        .bind(&snapshot.top_endpoints)
382        .bind(snapshot.memory_usage_bytes)
383        .bind(snapshot.cpu_usage_percent)
384        .bind(snapshot.thread_count)
385        .bind(snapshot.uptime_seconds)
386        .bind(&snapshot.workspace_id)
387        .bind(&snapshot.environment)
388        .execute(&self.pool)
389        .await?;
390
391        Ok(result.last_insert_rowid())
392    }
393
394    // ========================================================================
395    // Query Operations
396    // ========================================================================
397
398    /// Get minute aggregates for a time range
399    pub async fn get_minute_aggregates(
400        &self,
401        filter: &AnalyticsFilter,
402    ) -> Result<Vec<MetricsAggregate>> {
403        let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
404
405        if filter.start_time.is_some() {
406            query.push_str(" AND timestamp >= ?");
407        }
408        if filter.end_time.is_some() {
409            query.push_str(" AND timestamp <= ?");
410        }
411        if filter.protocol.is_some() {
412            query.push_str(" AND protocol = ?");
413        }
414        if filter.endpoint.is_some() {
415            query.push_str(" AND endpoint = ?");
416        }
417        if filter.method.is_some() {
418            query.push_str(" AND method = ?");
419        }
420        if filter.status_code.is_some() {
421            query.push_str(" AND status_code = ?");
422        }
423        if filter.workspace_id.is_some() {
424            query.push_str(" AND workspace_id = ?");
425        }
426        if filter.environment.is_some() {
427            query.push_str(" AND environment = ?");
428        }
429
430        query.push_str(" ORDER BY timestamp DESC");
431
432        if filter.limit.is_some() {
433            query.push_str(" LIMIT ?");
434        }
435
436        // Build the query with bound parameters
437        let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
438
439        if let Some(start) = filter.start_time {
440            sql_query = sql_query.bind(start);
441        }
442        if let Some(end) = filter.end_time {
443            sql_query = sql_query.bind(end);
444        }
445        if let Some(ref protocol) = filter.protocol {
446            sql_query = sql_query.bind(protocol);
447        }
448        if let Some(ref endpoint) = filter.endpoint {
449            sql_query = sql_query.bind(endpoint);
450        }
451        if let Some(ref method) = filter.method {
452            sql_query = sql_query.bind(method);
453        }
454        if let Some(status) = filter.status_code {
455            sql_query = sql_query.bind(status);
456        }
457        if let Some(ref workspace) = filter.workspace_id {
458            sql_query = sql_query.bind(workspace);
459        }
460        if let Some(ref env) = filter.environment {
461            sql_query = sql_query.bind(env);
462        }
463        if let Some(limit) = filter.limit {
464            sql_query = sql_query.bind(limit);
465        }
466
467        let results = sql_query.fetch_all(&self.pool).await?;
468
469        Ok(results)
470    }
471
472    /// Get hour-level aggregates
473    pub async fn get_hour_aggregates(
474        &self,
475        filter: &AnalyticsFilter,
476    ) -> Result<Vec<HourMetricsAggregate>> {
477        let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
478
479        if filter.start_time.is_some() {
480            query.push_str(" AND timestamp >= ?");
481        }
482        if filter.end_time.is_some() {
483            query.push_str(" AND timestamp <= ?");
484        }
485        if filter.protocol.is_some() {
486            query.push_str(" AND protocol = ?");
487        }
488        if filter.endpoint.is_some() {
489            query.push_str(" AND endpoint = ?");
490        }
491        if filter.method.is_some() {
492            query.push_str(" AND method = ?");
493        }
494        if filter.status_code.is_some() {
495            query.push_str(" AND status_code = ?");
496        }
497        if filter.workspace_id.is_some() {
498            query.push_str(" AND workspace_id = ?");
499        }
500        if filter.environment.is_some() {
501            query.push_str(" AND environment = ?");
502        }
503
504        query.push_str(" ORDER BY timestamp DESC");
505
506        if filter.limit.is_some() {
507            query.push_str(" LIMIT ?");
508        }
509
510        // Build the query with bound parameters
511        let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
512
513        if let Some(start) = filter.start_time {
514            sql_query = sql_query.bind(start);
515        }
516        if let Some(end) = filter.end_time {
517            sql_query = sql_query.bind(end);
518        }
519        if let Some(ref protocol) = filter.protocol {
520            sql_query = sql_query.bind(protocol);
521        }
522        if let Some(ref endpoint) = filter.endpoint {
523            sql_query = sql_query.bind(endpoint);
524        }
525        if let Some(ref method) = filter.method {
526            sql_query = sql_query.bind(method);
527        }
528        if let Some(status) = filter.status_code {
529            sql_query = sql_query.bind(status);
530        }
531        if let Some(ref workspace) = filter.workspace_id {
532            sql_query = sql_query.bind(workspace);
533        }
534        if let Some(ref env) = filter.environment {
535            sql_query = sql_query.bind(env);
536        }
537        if let Some(limit) = filter.limit {
538            sql_query = sql_query.bind(limit);
539        }
540
541        let results = sql_query.fetch_all(&self.pool).await?;
542
543        Ok(results)
544    }
545
546    /// Get top endpoints by request count
547    pub async fn get_top_endpoints(
548        &self,
549        limit: i64,
550        workspace_id: Option<&str>,
551    ) -> Result<Vec<EndpointStats>> {
552        let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
553
554        if workspace_id.is_some() {
555            query.push_str(" AND workspace_id = ?");
556        }
557
558        query.push_str(" ORDER BY total_requests DESC LIMIT ?");
559
560        let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
561
562        if let Some(workspace) = workspace_id {
563            sql_query = sql_query.bind(workspace);
564        }
565
566        sql_query = sql_query.bind(limit);
567
568        let results = sql_query.fetch_all(&self.pool).await?;
569
570        Ok(results)
571    }
572
573    /// Get recent error events
574    pub async fn get_recent_errors(
575        &self,
576        limit: i64,
577        filter: &AnalyticsFilter,
578    ) -> Result<Vec<ErrorEvent>> {
579        let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
580
581        if filter.start_time.is_some() {
582            query.push_str(" AND timestamp >= ?");
583        }
584        if filter.end_time.is_some() {
585            query.push_str(" AND timestamp <= ?");
586        }
587        if filter.endpoint.is_some() {
588            query.push_str(" AND endpoint = ?");
589        }
590        if filter.workspace_id.is_some() {
591            query.push_str(" AND workspace_id = ?");
592        }
593
594        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
595
596        let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
597
598        if let Some(start) = filter.start_time {
599            sql_query = sql_query.bind(start);
600        }
601        if let Some(end) = filter.end_time {
602            sql_query = sql_query.bind(end);
603        }
604        if let Some(ref endpoint) = filter.endpoint {
605            sql_query = sql_query.bind(endpoint);
606        }
607        if let Some(ref workspace) = filter.workspace_id {
608            sql_query = sql_query.bind(workspace);
609        }
610
611        sql_query = sql_query.bind(limit);
612
613        let results = sql_query.fetch_all(&self.pool).await?;
614
615        Ok(results)
616    }
617
618    /// Get traffic patterns for heatmap
619    pub async fn get_traffic_patterns(
620        &self,
621        days: i64,
622        workspace_id: Option<&str>,
623    ) -> Result<Vec<TrafficPattern>> {
624        let start_date = chrono::Utc::now() - chrono::Duration::days(days);
625        let start_date_str = start_date.format("%Y-%m-%d").to_string();
626
627        let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
628
629        if let Some(_workspace) = workspace_id {
630            query.push_str(" AND workspace_id = ?");
631        }
632
633        query.push_str(" ORDER BY date ASC, hour ASC");
634
635        let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
636
637        if let Some(workspace) = workspace_id {
638            query_builder = query_builder.bind(workspace);
639        }
640
641        let results = query_builder.fetch_all(&self.pool).await?;
642
643        Ok(results)
644    }
645
646    // ========================================================================
647    // Cleanup Operations
648    // ========================================================================
649
650    /// Delete old minute aggregates
651    pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
652        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
653
654        let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
655            .bind(cutoff)
656            .execute(&self.pool)
657            .await?;
658
659        info!(
660            "Cleaned up {} minute aggregates older than {} days",
661            result.rows_affected(),
662            days
663        );
664        Ok(result.rows_affected())
665    }
666
667    /// Delete old hour aggregates
668    pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
669        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
670
671        let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
672            .bind(cutoff)
673            .execute(&self.pool)
674            .await?;
675
676        info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
677        Ok(result.rows_affected())
678    }
679
680    /// Delete old error events
681    pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
682        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
683
684        let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
685            .bind(cutoff)
686            .execute(&self.pool)
687            .await?;
688
689        info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
690        Ok(result.rows_affected())
691    }
692
693    /// Vacuum the database to reclaim space
694    pub async fn vacuum(&self) -> Result<()> {
695        info!("Running VACUUM on analytics database");
696        sqlx::query("VACUUM").execute(&self.pool).await?;
697        info!("VACUUM completed");
698        Ok(())
699    }
700}
701
702#[cfg(test)]
703mod tests {
704    use super::*;
705
706    #[tokio::test]
707    async fn test_database_creation() {
708        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
709        db.run_migrations().await.unwrap();
710    }
711
712    #[tokio::test]
713    async fn test_insert_minute_aggregate() {
714        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
715        db.run_migrations().await.unwrap();
716
717        let agg = MetricsAggregate {
718            id: None,
719            timestamp: chrono::Utc::now().timestamp(),
720            protocol: "HTTP".to_string(),
721            method: Some("GET".to_string()),
722            endpoint: Some("/api/test".to_string()),
723            status_code: Some(200),
724            workspace_id: None,
725            environment: None,
726            request_count: 100,
727            error_count: 5,
728            latency_sum: 500.0,
729            latency_min: Some(10.0),
730            latency_max: Some(100.0),
731            latency_p50: Some(45.0),
732            latency_p95: Some(95.0),
733            latency_p99: Some(99.0),
734            bytes_sent: 10000,
735            bytes_received: 5000,
736            active_connections: Some(10),
737            created_at: None,
738        };
739
740        let id = db.insert_minute_aggregate(&agg).await.unwrap();
741        assert!(id > 0);
742    }
743}
744
745// ============================================================================
746// Coverage Metrics Operations (MockOps)
747// ============================================================================
748
749impl AnalyticsDatabase {
750    /// Record scenario usage
751    pub async fn record_scenario_usage(
752        &self,
753        scenario_id: &str,
754        workspace_id: Option<&str>,
755        org_id: Option<&str>,
756    ) -> Result<()> {
757        let now = chrono::Utc::now().timestamp();
758
759        // SQLite doesn't support ON CONFLICT with multiple columns easily, so use INSERT OR REPLACE
760        // First try to update existing record
761        let rows_affected = sqlx::query(
762            "UPDATE scenario_usage_metrics
763             SET usage_count = usage_count + 1,
764                 last_used_at = ?,
765                 updated_at = ?
766             WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
767               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
768        )
769        .bind(now)
770        .bind(now)
771        .bind(scenario_id)
772        .bind(workspace_id)
773        .bind(workspace_id)
774        .bind(org_id)
775        .bind(org_id)
776        .execute(&self.pool)
777        .await?;
778
779        // If no rows were updated, insert a new record
780        if rows_affected.rows_affected() == 0 {
781            sqlx::query(
782                "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
783                 VALUES (?, ?, ?, 1, ?, ?, ?)"
784            )
785            .bind(scenario_id)
786            .bind(workspace_id)
787            .bind(org_id)
788            .bind(now)
789            .bind(now)
790            .bind(now)
791            .execute(&self.pool)
792            .await?;
793        }
794
795        Ok(())
796    }
797
798    /// Record persona CI hit
799    pub async fn record_persona_ci_hit(
800        &self,
801        persona_id: &str,
802        workspace_id: Option<&str>,
803        org_id: Option<&str>,
804        ci_run_id: Option<&str>,
805    ) -> Result<()> {
806        let now = chrono::Utc::now().timestamp();
807
808        sqlx::query(
809            "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
810             VALUES (?, ?, ?, ?, 1, ?)"
811        )
812        .bind(persona_id)
813        .bind(workspace_id)
814        .bind(org_id)
815        .bind(ci_run_id)
816        .bind(now)
817        .execute(&self.pool)
818        .await?;
819
820        Ok(())
821    }
822
823    /// Record endpoint test coverage
824    pub async fn record_endpoint_coverage(
825        &self,
826        endpoint: &str,
827        method: Option<&str>,
828        protocol: &str,
829        workspace_id: Option<&str>,
830        org_id: Option<&str>,
831        coverage_percentage: Option<f64>,
832    ) -> Result<()> {
833        let now = chrono::Utc::now().timestamp();
834
835        // Try to update existing record
836        let rows_affected = sqlx::query(
837            "UPDATE endpoint_coverage
838             SET test_count = test_count + 1,
839                 last_tested_at = ?,
840                 coverage_percentage = COALESCE(?, coverage_percentage),
841                 updated_at = ?
842             WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
843               AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
844               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
845        )
846        .bind(now)
847        .bind(coverage_percentage)
848        .bind(now)
849        .bind(endpoint)
850        .bind(method)
851        .bind(method)
852        .bind(protocol)
853        .bind(workspace_id)
854        .bind(workspace_id)
855        .bind(org_id)
856        .bind(org_id)
857        .execute(&self.pool)
858        .await?;
859
860        // If no rows were updated, insert a new record
861        if rows_affected.rows_affected() == 0 {
862            sqlx::query(
863                "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
864                 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
865            )
866            .bind(endpoint)
867            .bind(method)
868            .bind(protocol)
869            .bind(workspace_id)
870            .bind(org_id)
871            .bind(now)
872            .bind(coverage_percentage)
873            .bind(now)
874            .bind(now)
875            .execute(&self.pool)
876            .await?;
877        }
878
879        Ok(())
880    }
881
882    /// Record reality level staleness
883    pub async fn record_reality_level_staleness(
884        &self,
885        workspace_id: &str,
886        org_id: Option<&str>,
887        endpoint: Option<&str>,
888        method: Option<&str>,
889        protocol: Option<&str>,
890        current_reality_level: Option<&str>,
891        staleness_days: Option<i32>,
892    ) -> Result<()> {
893        let now = chrono::Utc::now().timestamp();
894        let last_updated = if let Some(days) = staleness_days {
895            Some(now - (i64::from(days) * 86400))
896        } else {
897            Some(now)
898        };
899
900        sqlx::query(
901            "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
902             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
903             ON CONFLICT DO NOTHING"
904        )
905        .bind(workspace_id)
906        .bind(org_id)
907        .bind(endpoint)
908        .bind(method)
909        .bind(protocol)
910        .bind(current_reality_level)
911        .bind(last_updated)
912        .bind(staleness_days)
913        .bind(now)
914        .bind(now)
915        .execute(&self.pool)
916        .await?;
917
918        Ok(())
919    }
920
921    /// Record drift percentage metrics
922    pub async fn record_drift_percentage(
923        &self,
924        workspace_id: &str,
925        org_id: Option<&str>,
926        total_mocks: i64,
927        drifting_mocks: i64,
928    ) -> Result<()> {
929        let now = chrono::Utc::now().timestamp();
930        let drift_percentage = if total_mocks > 0 {
931            (drifting_mocks as f64 / total_mocks as f64) * 100.0
932        } else {
933            0.0
934        };
935
936        sqlx::query(
937            "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
938             VALUES (?, ?, ?, ?, ?, ?)"
939        )
940        .bind(workspace_id)
941        .bind(org_id)
942        .bind(total_mocks)
943        .bind(drifting_mocks)
944        .bind(drift_percentage)
945        .bind(now)
946        .execute(&self.pool)
947        .await?;
948
949        Ok(())
950    }
951
952    /// Get scenario usage metrics
953    pub async fn get_scenario_usage(
954        &self,
955        workspace_id: Option<&str>,
956        org_id: Option<&str>,
957        limit: Option<i64>,
958    ) -> Result<Vec<ScenarioUsageMetrics>> {
959        let limit = limit.unwrap_or(100);
960        let mut query = String::from(
961            "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
962             FROM scenario_usage_metrics
963             WHERE 1=1"
964        );
965
966        if workspace_id.is_some() {
967            query.push_str(" AND workspace_id = ?");
968        }
969        if org_id.is_some() {
970            query.push_str(" AND org_id = ?");
971        }
972        query.push_str(" ORDER BY usage_count DESC LIMIT ?");
973
974        let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
975        if let Some(ws_id) = workspace_id {
976            q = q.bind(ws_id);
977        }
978        if let Some(o_id) = org_id {
979            q = q.bind(o_id);
980        }
981        q = q.bind(limit);
982
983        let results = q.fetch_all(&self.pool).await?;
984        Ok(results)
985    }
986
987    /// Get persona CI hits
988    pub async fn get_persona_ci_hits(
989        &self,
990        workspace_id: Option<&str>,
991        org_id: Option<&str>,
992        limit: Option<i64>,
993    ) -> Result<Vec<PersonaCIHit>> {
994        let limit = limit.unwrap_or(100);
995        let mut query = String::from(
996            "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
997             FROM persona_ci_hits
998             WHERE 1=1",
999        );
1000
1001        if workspace_id.is_some() {
1002            query.push_str(" AND workspace_id = ?");
1003        }
1004        if org_id.is_some() {
1005            query.push_str(" AND org_id = ?");
1006        }
1007        query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1008
1009        let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1010        if let Some(ws_id) = workspace_id {
1011            q = q.bind(ws_id);
1012        }
1013        if let Some(o_id) = org_id {
1014            q = q.bind(o_id);
1015        }
1016        q = q.bind(limit);
1017
1018        let results = q.fetch_all(&self.pool).await?;
1019        Ok(results)
1020    }
1021
1022    /// Get endpoint coverage
1023    pub async fn get_endpoint_coverage(
1024        &self,
1025        workspace_id: Option<&str>,
1026        org_id: Option<&str>,
1027        min_coverage: Option<f64>,
1028    ) -> Result<Vec<EndpointCoverage>> {
1029        let mut query = String::from(
1030            "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1031             FROM endpoint_coverage
1032             WHERE 1=1"
1033        );
1034
1035        if workspace_id.is_some() {
1036            query.push_str(" AND workspace_id = ?");
1037        }
1038        if org_id.is_some() {
1039            query.push_str(" AND org_id = ?");
1040        }
1041        if min_coverage.is_some() {
1042            query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1043        }
1044        query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1045
1046        let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1047        if let Some(ws_id) = workspace_id {
1048            q = q.bind(ws_id);
1049        }
1050        if let Some(o_id) = org_id {
1051            q = q.bind(o_id);
1052        }
1053        if let Some(min_cov) = min_coverage {
1054            q = q.bind(min_cov);
1055        }
1056
1057        let results = q.fetch_all(&self.pool).await?;
1058        Ok(results)
1059    }
1060
1061    /// Get reality level staleness
1062    pub async fn get_reality_level_staleness(
1063        &self,
1064        workspace_id: Option<&str>,
1065        org_id: Option<&str>,
1066        max_staleness_days: Option<i32>,
1067    ) -> Result<Vec<RealityLevelStaleness>> {
1068        let mut query = String::from(
1069            "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1070             FROM reality_level_staleness
1071             WHERE 1=1"
1072        );
1073
1074        if workspace_id.is_some() {
1075            query.push_str(" AND workspace_id = ?");
1076        }
1077        if org_id.is_some() {
1078            query.push_str(" AND org_id = ?");
1079        }
1080        if max_staleness_days.is_some() {
1081            query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1082        }
1083        query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1084
1085        let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1086        if let Some(ws_id) = workspace_id {
1087            q = q.bind(ws_id);
1088        }
1089        if let Some(o_id) = org_id {
1090            q = q.bind(o_id);
1091        }
1092        if let Some(max_days) = max_staleness_days {
1093            q = q.bind(max_days);
1094        }
1095
1096        let results = q.fetch_all(&self.pool).await?;
1097        Ok(results)
1098    }
1099
1100    /// Get drift percentage metrics
1101    pub async fn get_drift_percentage(
1102        &self,
1103        workspace_id: Option<&str>,
1104        org_id: Option<&str>,
1105        limit: Option<i64>,
1106    ) -> Result<Vec<DriftPercentageMetrics>> {
1107        let limit = limit.unwrap_or(100);
1108        let mut query = String::from(
1109            "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1110             FROM drift_percentage_metrics
1111             WHERE 1=1"
1112        );
1113
1114        if workspace_id.is_some() {
1115            query.push_str(" AND workspace_id = ?");
1116        }
1117        if org_id.is_some() {
1118            query.push_str(" AND org_id = ?");
1119        }
1120        query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1121
1122        let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1123        if let Some(ws_id) = workspace_id {
1124            q = q.bind(ws_id);
1125        }
1126        if let Some(o_id) = org_id {
1127            q = q.bind(o_id);
1128        }
1129        q = q.bind(limit);
1130
1131        let results = q.fetch_all(&self.pool).await?;
1132        Ok(results)
1133    }
1134}