Skip to main content

mockforge_analytics/
database.rs

1//! Database layer for analytics storage
2
3use crate::error::{AnalyticsError, Result};
4use crate::models::{
5    AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6    EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7    PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14/// Analytics database manager
15#[derive(Clone)]
16pub struct AnalyticsDatabase {
17    pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21    /// Create a new analytics database connection
22    ///
23    /// # Arguments
24    /// * `database_path` - Path to the `SQLite` database file (or ":memory:" for in-memory)
25    ///
26    /// # Errors
27    ///
28    /// Returns an error if the database connection cannot be established or
29    /// `SQLite` pragmas fail to execute.
30    pub async fn new(database_path: &Path) -> Result<Self> {
31        let db_url = if database_path.to_str() == Some(":memory:") {
32            "sqlite::memory:".to_string()
33        } else {
34            format!("sqlite://{}", database_path.display())
35        };
36
37        info!("Connecting to analytics database: {}", db_url);
38
39        let pool =
40            SqlitePoolOptions::new()
41                .max_connections(10)
42                .connect(&db_url)
43                .await
44                .map_err(|e| {
45                    error!("Failed to connect to analytics database: {}", e);
46                    AnalyticsError::Database(e)
47                })?;
48
49        // Enable WAL mode for better concurrency
50        sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
51
52        // Enable foreign keys
53        sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
54
55        Ok(Self { pool })
56    }
57
58    /// Run database migrations
59    ///
60    /// # Errors
61    ///
62    /// Returns an error if any migration SQL fails to execute.
63    pub async fn run_migrations(&self) -> Result<()> {
64        info!("Running analytics database migrations");
65
66        // Run initial schema migration
67        let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
68        let mut conn = self.pool.acquire().await?;
69        let mut stream = conn.execute_many(migration_sql);
70
71        while stream
72            .try_next()
73            .await
74            .map_err(|e| {
75                error!("Migration error: {}", e);
76                AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
77            })?
78            .is_some()
79        {}
80
81        // Run coverage metrics migration
82        let coverage_migration_sql = include_str!("../migrations/003_coverage_metrics.sql");
83        let mut conn = self.pool.acquire().await?;
84        let mut stream = conn.execute_many(coverage_migration_sql);
85
86        while stream
87            .try_next()
88            .await
89            .map_err(|e| {
90                error!("Coverage metrics migration error: {}", e);
91                AnalyticsError::Migration(format!(
92                    "Failed to execute coverage metrics migration: {e}"
93                ))
94            })?
95            .is_some()
96        {}
97
98        // Run pillar usage migration
99        let pillar_usage_migration_sql = include_str!("../migrations/002_pillar_usage.sql");
100        let mut conn = self.pool.acquire().await?;
101        let mut stream = conn.execute_many(pillar_usage_migration_sql);
102
103        while stream
104            .try_next()
105            .await
106            .map_err(|e| {
107                error!("Pillar usage migration error: {}", e);
108                AnalyticsError::Migration(format!("Failed to execute pillar usage migration: {e}"))
109            })?
110            .is_some()
111        {}
112
113        info!("Analytics database migrations completed successfully");
114        Ok(())
115    }
116
117    /// Get a reference to the database pool
118    #[must_use]
119    pub const fn pool(&self) -> &SqlitePool {
120        &self.pool
121    }
122
123    // ========================================================================
124    // Insert Operations
125    // ========================================================================
126
127    /// Insert a minute-level metrics aggregate
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if the database insert fails.
132    pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
133        let result = sqlx::query(
134            "INSERT INTO metrics_aggregates_minute (
135                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
136                request_count, error_count, latency_sum, latency_min, latency_max,
137                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
138            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
139        )
140        .bind(agg.timestamp)
141        .bind(&agg.protocol)
142        .bind(&agg.method)
143        .bind(&agg.endpoint)
144        .bind(agg.status_code)
145        .bind(&agg.workspace_id)
146        .bind(&agg.environment)
147        .bind(agg.request_count)
148        .bind(agg.error_count)
149        .bind(agg.latency_sum)
150        .bind(agg.latency_min)
151        .bind(agg.latency_max)
152        .bind(agg.latency_p50)
153        .bind(agg.latency_p95)
154        .bind(agg.latency_p99)
155        .bind(agg.bytes_sent)
156        .bind(agg.bytes_received)
157        .bind(agg.active_connections)
158        .execute(&self.pool)
159        .await?;
160
161        Ok(result.last_insert_rowid())
162    }
163
164    /// Insert multiple minute-level aggregates in a batch
165    ///
166    /// # Errors
167    ///
168    /// Returns an error if any database insert fails; the transaction is rolled back.
169    pub async fn insert_minute_aggregates_batch(
170        &self,
171        aggregates: &[MetricsAggregate],
172    ) -> Result<()> {
173        if aggregates.is_empty() {
174            return Ok(());
175        }
176
177        let mut tx = self.pool.begin().await?;
178
179        for agg in aggregates {
180            sqlx::query(
181                r"
182                INSERT INTO metrics_aggregates_minute (
183                    timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
184                    request_count, error_count, latency_sum, latency_min, latency_max,
185                    latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
186                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
187                ",
188            )
189            .bind(agg.timestamp)
190            .bind(&agg.protocol)
191            .bind(&agg.method)
192            .bind(&agg.endpoint)
193            .bind(agg.status_code)
194            .bind(&agg.workspace_id)
195            .bind(&agg.environment)
196            .bind(agg.request_count)
197            .bind(agg.error_count)
198            .bind(agg.latency_sum)
199            .bind(agg.latency_min)
200            .bind(agg.latency_max)
201            .bind(agg.latency_p50)
202            .bind(agg.latency_p95)
203            .bind(agg.latency_p99)
204            .bind(agg.bytes_sent)
205            .bind(agg.bytes_received)
206            .bind(agg.active_connections)
207            .execute(&mut *tx)
208            .await?;
209        }
210
211        tx.commit().await?;
212        debug!("Inserted {} minute aggregates", aggregates.len());
213        Ok(())
214    }
215
216    /// Insert an hour-level metrics aggregate
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the database insert fails.
221    pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
222        let result = sqlx::query(
223            r"
224            INSERT INTO metrics_aggregates_hour (
225                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
226                request_count, error_count, latency_sum, latency_min, latency_max,
227                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
228                active_connections_avg, active_connections_max
229            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
230            ",
231        )
232        .bind(agg.timestamp)
233        .bind(&agg.protocol)
234        .bind(&agg.method)
235        .bind(&agg.endpoint)
236        .bind(agg.status_code)
237        .bind(&agg.workspace_id)
238        .bind(&agg.environment)
239        .bind(agg.request_count)
240        .bind(agg.error_count)
241        .bind(agg.latency_sum)
242        .bind(agg.latency_min)
243        .bind(agg.latency_max)
244        .bind(agg.latency_p50)
245        .bind(agg.latency_p95)
246        .bind(agg.latency_p99)
247        .bind(agg.bytes_sent)
248        .bind(agg.bytes_received)
249        .bind(agg.active_connections_avg)
250        .bind(agg.active_connections_max)
251        .execute(&self.pool)
252        .await?;
253
254        Ok(result.last_insert_rowid())
255    }
256
257    /// Insert a day-level metrics aggregate
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if the database insert fails.
262    pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
263        let result = sqlx::query(
264            r"
265            INSERT INTO metrics_aggregates_day (
266                date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
267                request_count, error_count, latency_sum, latency_min, latency_max,
268                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
269                active_connections_avg, active_connections_max, unique_clients, peak_hour
270            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
271            ",
272        )
273        .bind(&agg.date)
274        .bind(agg.timestamp)
275        .bind(&agg.protocol)
276        .bind(&agg.method)
277        .bind(&agg.endpoint)
278        .bind(agg.status_code)
279        .bind(&agg.workspace_id)
280        .bind(&agg.environment)
281        .bind(agg.request_count)
282        .bind(agg.error_count)
283        .bind(agg.latency_sum)
284        .bind(agg.latency_min)
285        .bind(agg.latency_max)
286        .bind(agg.latency_p50)
287        .bind(agg.latency_p95)
288        .bind(agg.latency_p99)
289        .bind(agg.bytes_sent)
290        .bind(agg.bytes_received)
291        .bind(agg.active_connections_avg)
292        .bind(agg.active_connections_max)
293        .bind(agg.unique_clients)
294        .bind(agg.peak_hour)
295        .execute(&self.pool)
296        .await?;
297
298        Ok(result.last_insert_rowid())
299    }
300
301    /// Insert or update endpoint statistics
302    ///
303    /// # Errors
304    ///
305    /// Returns an error if the database upsert fails.
306    pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
307        sqlx::query(
308            r"
309            INSERT INTO endpoint_stats (
310                endpoint, protocol, method, workspace_id, environment,
311                total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
312                p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
313                first_seen, last_seen
314            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
315            ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
316            DO UPDATE SET
317                total_requests = total_requests + excluded.total_requests,
318                total_errors = total_errors + excluded.total_errors,
319                avg_latency_ms = excluded.avg_latency_ms,
320                min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
321                max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
322                p95_latency_ms = excluded.p95_latency_ms,
323                status_codes = excluded.status_codes,
324                total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
325                total_bytes_received = total_bytes_received + excluded.total_bytes_received,
326                last_seen = excluded.last_seen,
327                updated_at = strftime('%s', 'now')
328            ",
329        )
330        .bind(&stats.endpoint)
331        .bind(&stats.protocol)
332        .bind(&stats.method)
333        .bind(&stats.workspace_id)
334        .bind(&stats.environment)
335        .bind(stats.total_requests)
336        .bind(stats.total_errors)
337        .bind(stats.avg_latency_ms)
338        .bind(stats.min_latency_ms)
339        .bind(stats.max_latency_ms)
340        .bind(stats.p95_latency_ms)
341        .bind(&stats.status_codes)
342        .bind(stats.total_bytes_sent)
343        .bind(stats.total_bytes_received)
344        .bind(stats.first_seen)
345        .bind(stats.last_seen)
346        .execute(&self.pool)
347        .await?;
348
349        Ok(())
350    }
351
352    /// Insert an error event
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if the database insert fails.
357    pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
358        let result = sqlx::query(
359            r"
360            INSERT INTO error_events (
361                timestamp, protocol, method, endpoint, status_code,
362                error_type, error_message, error_category,
363                request_id, trace_id, span_id,
364                client_ip, user_agent, workspace_id, environment, metadata
365            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
366            ",
367        )
368        .bind(error.timestamp)
369        .bind(&error.protocol)
370        .bind(&error.method)
371        .bind(&error.endpoint)
372        .bind(error.status_code)
373        .bind(&error.error_type)
374        .bind(&error.error_message)
375        .bind(&error.error_category)
376        .bind(&error.request_id)
377        .bind(&error.trace_id)
378        .bind(&error.span_id)
379        .bind(&error.client_ip)
380        .bind(&error.user_agent)
381        .bind(&error.workspace_id)
382        .bind(&error.environment)
383        .bind(&error.metadata)
384        .execute(&self.pool)
385        .await?;
386
387        Ok(result.last_insert_rowid())
388    }
389
390    /// Insert a traffic pattern
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the database upsert fails.
395    pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
396        sqlx::query(
397            r"
398            INSERT INTO traffic_patterns (
399                date, hour, day_of_week, protocol, workspace_id, environment,
400                request_count, error_count, avg_latency_ms, unique_clients
401            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
402            ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
403            DO UPDATE SET
404                request_count = request_count + excluded.request_count,
405                error_count = error_count + excluded.error_count,
406                avg_latency_ms = excluded.avg_latency_ms,
407                unique_clients = excluded.unique_clients
408            ",
409        )
410        .bind(&pattern.date)
411        .bind(pattern.hour)
412        .bind(pattern.day_of_week)
413        .bind(&pattern.protocol)
414        .bind(&pattern.workspace_id)
415        .bind(&pattern.environment)
416        .bind(pattern.request_count)
417        .bind(pattern.error_count)
418        .bind(pattern.avg_latency_ms)
419        .bind(pattern.unique_clients)
420        .execute(&self.pool)
421        .await?;
422
423        Ok(())
424    }
425
426    /// Insert an analytics snapshot
427    ///
428    /// # Errors
429    ///
430    /// Returns an error if the database insert fails.
431    pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
432        let result = sqlx::query(
433            r"
434            INSERT INTO analytics_snapshots (
435                timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
436                active_connections, protocol_stats, top_endpoints,
437                memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
438                workspace_id, environment
439            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
440            ",
441        )
442        .bind(snapshot.timestamp)
443        .bind(&snapshot.snapshot_type)
444        .bind(snapshot.total_requests)
445        .bind(snapshot.total_errors)
446        .bind(snapshot.avg_latency_ms)
447        .bind(snapshot.active_connections)
448        .bind(&snapshot.protocol_stats)
449        .bind(&snapshot.top_endpoints)
450        .bind(snapshot.memory_usage_bytes)
451        .bind(snapshot.cpu_usage_percent)
452        .bind(snapshot.thread_count)
453        .bind(snapshot.uptime_seconds)
454        .bind(&snapshot.workspace_id)
455        .bind(&snapshot.environment)
456        .execute(&self.pool)
457        .await?;
458
459        Ok(result.last_insert_rowid())
460    }
461
462    // ========================================================================
463    // Query Operations
464    // ========================================================================
465
466    /// Get minute aggregates for a time range
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the database query fails.
471    pub async fn get_minute_aggregates(
472        &self,
473        filter: &AnalyticsFilter,
474    ) -> Result<Vec<MetricsAggregate>> {
475        let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
476
477        if filter.start_time.is_some() {
478            query.push_str(" AND timestamp >= ?");
479        }
480        if filter.end_time.is_some() {
481            query.push_str(" AND timestamp <= ?");
482        }
483        if filter.protocol.is_some() {
484            query.push_str(" AND protocol = ?");
485        }
486        if filter.endpoint.is_some() {
487            query.push_str(" AND endpoint = ?");
488        }
489        if filter.method.is_some() {
490            query.push_str(" AND method = ?");
491        }
492        if filter.status_code.is_some() {
493            query.push_str(" AND status_code = ?");
494        }
495        if filter.workspace_id.is_some() {
496            query.push_str(" AND workspace_id = ?");
497        }
498        if filter.environment.is_some() {
499            query.push_str(" AND environment = ?");
500        }
501
502        query.push_str(" ORDER BY timestamp DESC");
503
504        if filter.limit.is_some() {
505            query.push_str(" LIMIT ?");
506        }
507
508        // Build the query with bound parameters
509        let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
510
511        if let Some(start) = filter.start_time {
512            sql_query = sql_query.bind(start);
513        }
514        if let Some(end) = filter.end_time {
515            sql_query = sql_query.bind(end);
516        }
517        if let Some(ref protocol) = filter.protocol {
518            sql_query = sql_query.bind(protocol);
519        }
520        if let Some(ref endpoint) = filter.endpoint {
521            sql_query = sql_query.bind(endpoint);
522        }
523        if let Some(ref method) = filter.method {
524            sql_query = sql_query.bind(method);
525        }
526        if let Some(status) = filter.status_code {
527            sql_query = sql_query.bind(status);
528        }
529        if let Some(ref workspace) = filter.workspace_id {
530            sql_query = sql_query.bind(workspace);
531        }
532        if let Some(ref env) = filter.environment {
533            sql_query = sql_query.bind(env);
534        }
535        if let Some(limit) = filter.limit {
536            sql_query = sql_query.bind(limit);
537        }
538
539        let results = sql_query.fetch_all(&self.pool).await?;
540
541        Ok(results)
542    }
543
544    /// Get hour-level aggregates
545    ///
546    /// # Errors
547    ///
548    /// Returns an error if the database query fails.
549    pub async fn get_hour_aggregates(
550        &self,
551        filter: &AnalyticsFilter,
552    ) -> Result<Vec<HourMetricsAggregate>> {
553        let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
554
555        if filter.start_time.is_some() {
556            query.push_str(" AND timestamp >= ?");
557        }
558        if filter.end_time.is_some() {
559            query.push_str(" AND timestamp <= ?");
560        }
561        if filter.protocol.is_some() {
562            query.push_str(" AND protocol = ?");
563        }
564        if filter.endpoint.is_some() {
565            query.push_str(" AND endpoint = ?");
566        }
567        if filter.method.is_some() {
568            query.push_str(" AND method = ?");
569        }
570        if filter.status_code.is_some() {
571            query.push_str(" AND status_code = ?");
572        }
573        if filter.workspace_id.is_some() {
574            query.push_str(" AND workspace_id = ?");
575        }
576        if filter.environment.is_some() {
577            query.push_str(" AND environment = ?");
578        }
579
580        query.push_str(" ORDER BY timestamp DESC");
581
582        if filter.limit.is_some() {
583            query.push_str(" LIMIT ?");
584        }
585
586        // Build the query with bound parameters
587        let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
588
589        if let Some(start) = filter.start_time {
590            sql_query = sql_query.bind(start);
591        }
592        if let Some(end) = filter.end_time {
593            sql_query = sql_query.bind(end);
594        }
595        if let Some(ref protocol) = filter.protocol {
596            sql_query = sql_query.bind(protocol);
597        }
598        if let Some(ref endpoint) = filter.endpoint {
599            sql_query = sql_query.bind(endpoint);
600        }
601        if let Some(ref method) = filter.method {
602            sql_query = sql_query.bind(method);
603        }
604        if let Some(status) = filter.status_code {
605            sql_query = sql_query.bind(status);
606        }
607        if let Some(ref workspace) = filter.workspace_id {
608            sql_query = sql_query.bind(workspace);
609        }
610        if let Some(ref env) = filter.environment {
611            sql_query = sql_query.bind(env);
612        }
613        if let Some(limit) = filter.limit {
614            sql_query = sql_query.bind(limit);
615        }
616
617        let results = sql_query.fetch_all(&self.pool).await?;
618
619        Ok(results)
620    }
621
622    /// Get top endpoints by request count
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the database query fails.
627    pub async fn get_top_endpoints(
628        &self,
629        limit: i64,
630        workspace_id: Option<&str>,
631    ) -> Result<Vec<EndpointStats>> {
632        let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
633
634        if workspace_id.is_some() {
635            query.push_str(" AND workspace_id = ?");
636        }
637
638        query.push_str(" ORDER BY total_requests DESC LIMIT ?");
639
640        let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
641
642        if let Some(workspace) = workspace_id {
643            sql_query = sql_query.bind(workspace);
644        }
645
646        sql_query = sql_query.bind(limit);
647
648        let results = sql_query.fetch_all(&self.pool).await?;
649
650        Ok(results)
651    }
652
653    /// Get recent error events
654    ///
655    /// # Errors
656    ///
657    /// Returns an error if the database query fails.
658    pub async fn get_recent_errors(
659        &self,
660        limit: i64,
661        filter: &AnalyticsFilter,
662    ) -> Result<Vec<ErrorEvent>> {
663        let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
664
665        if filter.start_time.is_some() {
666            query.push_str(" AND timestamp >= ?");
667        }
668        if filter.end_time.is_some() {
669            query.push_str(" AND timestamp <= ?");
670        }
671        if filter.endpoint.is_some() {
672            query.push_str(" AND endpoint = ?");
673        }
674        if filter.workspace_id.is_some() {
675            query.push_str(" AND workspace_id = ?");
676        }
677
678        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
679
680        let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
681
682        if let Some(start) = filter.start_time {
683            sql_query = sql_query.bind(start);
684        }
685        if let Some(end) = filter.end_time {
686            sql_query = sql_query.bind(end);
687        }
688        if let Some(ref endpoint) = filter.endpoint {
689            sql_query = sql_query.bind(endpoint);
690        }
691        if let Some(ref workspace) = filter.workspace_id {
692            sql_query = sql_query.bind(workspace);
693        }
694
695        sql_query = sql_query.bind(limit);
696
697        let results = sql_query.fetch_all(&self.pool).await?;
698
699        Ok(results)
700    }
701
702    /// Get traffic patterns for heatmap
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if the database query fails.
707    pub async fn get_traffic_patterns(
708        &self,
709        days: i64,
710        workspace_id: Option<&str>,
711    ) -> Result<Vec<TrafficPattern>> {
712        let start_date = chrono::Utc::now() - chrono::Duration::days(days);
713        let start_date_str = start_date.format("%Y-%m-%d").to_string();
714
715        let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
716
717        if workspace_id.is_some() {
718            query.push_str(" AND workspace_id = ?");
719        }
720
721        query.push_str(" ORDER BY date ASC, hour ASC");
722
723        let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
724
725        if let Some(workspace) = workspace_id {
726            query_builder = query_builder.bind(workspace);
727        }
728
729        let results = query_builder.fetch_all(&self.pool).await?;
730
731        Ok(results)
732    }
733
734    // ========================================================================
735    // Cleanup Operations
736    // ========================================================================
737
738    /// Delete old minute aggregates
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the database delete fails.
743    pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
744        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
745
746        let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
747            .bind(cutoff)
748            .execute(&self.pool)
749            .await?;
750
751        info!(
752            "Cleaned up {} minute aggregates older than {} days",
753            result.rows_affected(),
754            days
755        );
756        Ok(result.rows_affected())
757    }
758
759    /// Delete old hour aggregates
760    ///
761    /// # Errors
762    ///
763    /// Returns an error if the database delete fails.
764    pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
765        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
766
767        let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
768            .bind(cutoff)
769            .execute(&self.pool)
770            .await?;
771
772        info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
773        Ok(result.rows_affected())
774    }
775
776    /// Delete old error events
777    ///
778    /// # Errors
779    ///
780    /// Returns an error if the database delete fails.
781    pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
782        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
783
784        let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
785            .bind(cutoff)
786            .execute(&self.pool)
787            .await?;
788
789        info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
790        Ok(result.rows_affected())
791    }
792
793    /// Vacuum the database to reclaim space
794    ///
795    /// # Errors
796    ///
797    /// Returns an error if the VACUUM command fails.
798    pub async fn vacuum(&self) -> Result<()> {
799        info!("Running VACUUM on analytics database");
800        sqlx::query("VACUUM").execute(&self.pool).await?;
801        info!("VACUUM completed");
802        Ok(())
803    }
804}
805
806// ============================================================================
807// Coverage Metrics Operations (MockOps)
808// ============================================================================
809
810impl AnalyticsDatabase {
811    /// Record scenario usage
812    ///
813    /// # Errors
814    ///
815    /// Returns an error if the database operation fails.
816    pub async fn record_scenario_usage(
817        &self,
818        scenario_id: &str,
819        workspace_id: Option<&str>,
820        org_id: Option<&str>,
821    ) -> Result<()> {
822        let now = chrono::Utc::now().timestamp();
823
824        // SQLite doesn't support ON CONFLICT with multiple columns easily, so use INSERT OR REPLACE
825        // First try to update existing record
826        let rows_affected = sqlx::query(
827            "UPDATE scenario_usage_metrics
828             SET usage_count = usage_count + 1,
829                 last_used_at = ?,
830                 updated_at = ?
831             WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
832               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
833        )
834        .bind(now)
835        .bind(now)
836        .bind(scenario_id)
837        .bind(workspace_id)
838        .bind(workspace_id)
839        .bind(org_id)
840        .bind(org_id)
841        .execute(&self.pool)
842        .await?;
843
844        // If no rows were updated, insert a new record
845        if rows_affected.rows_affected() == 0 {
846            sqlx::query(
847                "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
848                 VALUES (?, ?, ?, 1, ?, ?, ?)"
849            )
850            .bind(scenario_id)
851            .bind(workspace_id)
852            .bind(org_id)
853            .bind(now)
854            .bind(now)
855            .bind(now)
856            .execute(&self.pool)
857            .await?;
858        }
859
860        Ok(())
861    }
862
863    /// Record persona CI hit
864    ///
865    /// # Errors
866    ///
867    /// Returns an error if the database insert fails.
868    pub async fn record_persona_ci_hit(
869        &self,
870        persona_id: &str,
871        workspace_id: Option<&str>,
872        org_id: Option<&str>,
873        ci_run_id: Option<&str>,
874    ) -> Result<()> {
875        let now = chrono::Utc::now().timestamp();
876
877        sqlx::query(
878            "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
879             VALUES (?, ?, ?, ?, 1, ?)"
880        )
881        .bind(persona_id)
882        .bind(workspace_id)
883        .bind(org_id)
884        .bind(ci_run_id)
885        .bind(now)
886        .execute(&self.pool)
887        .await?;
888
889        Ok(())
890    }
891
892    /// Record endpoint test coverage
893    ///
894    /// # Errors
895    ///
896    /// Returns an error if the database operation fails.
897    #[allow(clippy::too_many_arguments)]
898    pub async fn record_endpoint_coverage(
899        &self,
900        endpoint: &str,
901        method: Option<&str>,
902        protocol: &str,
903        workspace_id: Option<&str>,
904        org_id: Option<&str>,
905        coverage_percentage: Option<f64>,
906    ) -> Result<()> {
907        let now = chrono::Utc::now().timestamp();
908
909        // Try to update existing record
910        let rows_affected = sqlx::query(
911            "UPDATE endpoint_coverage
912             SET test_count = test_count + 1,
913                 last_tested_at = ?,
914                 coverage_percentage = COALESCE(?, coverage_percentage),
915                 updated_at = ?
916             WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
917               AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
918               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
919        )
920        .bind(now)
921        .bind(coverage_percentage)
922        .bind(now)
923        .bind(endpoint)
924        .bind(method)
925        .bind(method)
926        .bind(protocol)
927        .bind(workspace_id)
928        .bind(workspace_id)
929        .bind(org_id)
930        .bind(org_id)
931        .execute(&self.pool)
932        .await?;
933
934        // If no rows were updated, insert a new record
935        if rows_affected.rows_affected() == 0 {
936            sqlx::query(
937                "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
938                 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
939            )
940            .bind(endpoint)
941            .bind(method)
942            .bind(protocol)
943            .bind(workspace_id)
944            .bind(org_id)
945            .bind(now)
946            .bind(coverage_percentage)
947            .bind(now)
948            .bind(now)
949            .execute(&self.pool)
950            .await?;
951        }
952
953        Ok(())
954    }
955
956    /// Record reality level staleness
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if the database insert fails.
961    #[allow(clippy::too_many_arguments)]
962    pub async fn record_reality_level_staleness(
963        &self,
964        workspace_id: &str,
965        org_id: Option<&str>,
966        endpoint: Option<&str>,
967        method: Option<&str>,
968        protocol: Option<&str>,
969        current_reality_level: Option<&str>,
970        staleness_days: Option<i32>,
971    ) -> Result<()> {
972        let now = chrono::Utc::now().timestamp();
973        let last_updated = Some(staleness_days.map_or(now, |days| now - (i64::from(days) * 86400)));
974
975        sqlx::query(
976            "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
977             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
978             ON CONFLICT DO NOTHING"
979        )
980        .bind(workspace_id)
981        .bind(org_id)
982        .bind(endpoint)
983        .bind(method)
984        .bind(protocol)
985        .bind(current_reality_level)
986        .bind(last_updated)
987        .bind(staleness_days)
988        .bind(now)
989        .bind(now)
990        .execute(&self.pool)
991        .await?;
992
993        Ok(())
994    }
995
996    /// Record drift percentage metrics
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if the database insert fails.
1001    pub async fn record_drift_percentage(
1002        &self,
1003        workspace_id: &str,
1004        org_id: Option<&str>,
1005        total_mocks: i64,
1006        drifting_mocks: i64,
1007    ) -> Result<()> {
1008        let now = chrono::Utc::now().timestamp();
1009        #[allow(clippy::cast_precision_loss)]
1010        let drift_percentage = if total_mocks > 0 {
1011            (drifting_mocks as f64 / total_mocks as f64) * 100.0
1012        } else {
1013            0.0
1014        };
1015
1016        sqlx::query(
1017            "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
1018             VALUES (?, ?, ?, ?, ?, ?)"
1019        )
1020        .bind(workspace_id)
1021        .bind(org_id)
1022        .bind(total_mocks)
1023        .bind(drifting_mocks)
1024        .bind(drift_percentage)
1025        .bind(now)
1026        .execute(&self.pool)
1027        .await?;
1028
1029        Ok(())
1030    }
1031
1032    /// Get scenario usage metrics
1033    ///
1034    /// # Errors
1035    ///
1036    /// Returns an error if the database query fails.
1037    pub async fn get_scenario_usage(
1038        &self,
1039        workspace_id: Option<&str>,
1040        org_id: Option<&str>,
1041        limit: Option<i64>,
1042    ) -> Result<Vec<ScenarioUsageMetrics>> {
1043        let limit = limit.unwrap_or(100);
1044        let mut query = String::from(
1045            "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
1046             FROM scenario_usage_metrics
1047             WHERE 1=1"
1048        );
1049
1050        if workspace_id.is_some() {
1051            query.push_str(" AND workspace_id = ?");
1052        }
1053        if org_id.is_some() {
1054            query.push_str(" AND org_id = ?");
1055        }
1056        query.push_str(" ORDER BY usage_count DESC LIMIT ?");
1057
1058        let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
1059        if let Some(ws_id) = workspace_id {
1060            q = q.bind(ws_id);
1061        }
1062        if let Some(o_id) = org_id {
1063            q = q.bind(o_id);
1064        }
1065        q = q.bind(limit);
1066
1067        let results = q.fetch_all(&self.pool).await?;
1068        Ok(results)
1069    }
1070
1071    /// Get persona CI hits
1072    ///
1073    /// # Errors
1074    ///
1075    /// Returns an error if the database query fails.
1076    pub async fn get_persona_ci_hits(
1077        &self,
1078        workspace_id: Option<&str>,
1079        org_id: Option<&str>,
1080        limit: Option<i64>,
1081    ) -> Result<Vec<PersonaCIHit>> {
1082        let limit = limit.unwrap_or(100);
1083        let mut query = String::from(
1084            "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1085             FROM persona_ci_hits
1086             WHERE 1=1",
1087        );
1088
1089        if workspace_id.is_some() {
1090            query.push_str(" AND workspace_id = ?");
1091        }
1092        if org_id.is_some() {
1093            query.push_str(" AND org_id = ?");
1094        }
1095        query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1096
1097        let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1098        if let Some(ws_id) = workspace_id {
1099            q = q.bind(ws_id);
1100        }
1101        if let Some(o_id) = org_id {
1102            q = q.bind(o_id);
1103        }
1104        q = q.bind(limit);
1105
1106        let results = q.fetch_all(&self.pool).await?;
1107        Ok(results)
1108    }
1109
1110    /// Get endpoint coverage
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the database query fails.
1115    pub async fn get_endpoint_coverage(
1116        &self,
1117        workspace_id: Option<&str>,
1118        org_id: Option<&str>,
1119        min_coverage: Option<f64>,
1120    ) -> Result<Vec<EndpointCoverage>> {
1121        let mut query = String::from(
1122            "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1123             FROM endpoint_coverage
1124             WHERE 1=1"
1125        );
1126
1127        if workspace_id.is_some() {
1128            query.push_str(" AND workspace_id = ?");
1129        }
1130        if org_id.is_some() {
1131            query.push_str(" AND org_id = ?");
1132        }
1133        if min_coverage.is_some() {
1134            query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1135        }
1136        query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1137
1138        let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1139        if let Some(ws_id) = workspace_id {
1140            q = q.bind(ws_id);
1141        }
1142        if let Some(o_id) = org_id {
1143            q = q.bind(o_id);
1144        }
1145        if let Some(min_cov) = min_coverage {
1146            q = q.bind(min_cov);
1147        }
1148
1149        let results = q.fetch_all(&self.pool).await?;
1150        Ok(results)
1151    }
1152
1153    /// Get reality level staleness
1154    ///
1155    /// # Errors
1156    ///
1157    /// Returns an error if the database query fails.
1158    pub async fn get_reality_level_staleness(
1159        &self,
1160        workspace_id: Option<&str>,
1161        org_id: Option<&str>,
1162        max_staleness_days: Option<i32>,
1163    ) -> Result<Vec<RealityLevelStaleness>> {
1164        let mut query = String::from(
1165            "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1166             FROM reality_level_staleness
1167             WHERE 1=1"
1168        );
1169
1170        if workspace_id.is_some() {
1171            query.push_str(" AND workspace_id = ?");
1172        }
1173        if org_id.is_some() {
1174            query.push_str(" AND org_id = ?");
1175        }
1176        if max_staleness_days.is_some() {
1177            query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1178        }
1179        query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1180
1181        let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1182        if let Some(ws_id) = workspace_id {
1183            q = q.bind(ws_id);
1184        }
1185        if let Some(o_id) = org_id {
1186            q = q.bind(o_id);
1187        }
1188        if let Some(max_days) = max_staleness_days {
1189            q = q.bind(max_days);
1190        }
1191
1192        let results = q.fetch_all(&self.pool).await?;
1193        Ok(results)
1194    }
1195
1196    /// Get drift percentage metrics
1197    ///
1198    /// # Errors
1199    ///
1200    /// Returns an error if the database query fails.
1201    pub async fn get_drift_percentage(
1202        &self,
1203        workspace_id: Option<&str>,
1204        org_id: Option<&str>,
1205        limit: Option<i64>,
1206    ) -> Result<Vec<DriftPercentageMetrics>> {
1207        let limit = limit.unwrap_or(100);
1208        let mut query = String::from(
1209            "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1210             FROM drift_percentage_metrics
1211             WHERE 1=1"
1212        );
1213
1214        if workspace_id.is_some() {
1215            query.push_str(" AND workspace_id = ?");
1216        }
1217        if org_id.is_some() {
1218            query.push_str(" AND org_id = ?");
1219        }
1220        query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1221
1222        let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1223        if let Some(ws_id) = workspace_id {
1224            q = q.bind(ws_id);
1225        }
1226        if let Some(o_id) = org_id {
1227            q = q.bind(o_id);
1228        }
1229        q = q.bind(limit);
1230
1231        let results = q.fetch_all(&self.pool).await?;
1232        Ok(results)
1233    }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::*;
1239
1240    #[tokio::test]
1241    async fn test_database_creation() {
1242        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1243        db.run_migrations().await.unwrap();
1244    }
1245
1246    #[tokio::test]
1247    async fn test_insert_minute_aggregate() {
1248        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1249        db.run_migrations().await.unwrap();
1250
1251        let agg = MetricsAggregate {
1252            id: None,
1253            timestamp: chrono::Utc::now().timestamp(),
1254            protocol: "HTTP".to_string(),
1255            method: Some("GET".to_string()),
1256            endpoint: Some("/api/test".to_string()),
1257            status_code: Some(200),
1258            workspace_id: None,
1259            environment: None,
1260            request_count: 100,
1261            error_count: 5,
1262            latency_sum: 500.0,
1263            latency_min: Some(10.0),
1264            latency_max: Some(100.0),
1265            latency_p50: Some(45.0),
1266            latency_p95: Some(95.0),
1267            latency_p99: Some(99.0),
1268            bytes_sent: 10_000,
1269            bytes_received: 5_000,
1270            active_connections: Some(10),
1271            created_at: None,
1272        };
1273
1274        let id = db.insert_minute_aggregate(&agg).await.unwrap();
1275        assert!(id > 0);
1276    }
1277}