Skip to main content

mockforge_analytics/
database.rs

1//! Database layer for analytics storage
2
3use crate::error::{AnalyticsError, Result};
4use crate::models::{
5    AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6    EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7    PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14/// Analytics database manager
15#[derive(Clone)]
16pub struct AnalyticsDatabase {
17    pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21    /// Create a new analytics database connection
22    ///
23    /// # Arguments
24    /// * `database_path` - Path to the `SQLite` database file (or ":memory:" for in-memory)
25    ///
26    /// # Errors
27    ///
28    /// Returns an error if the database connection cannot be established or
29    /// `SQLite` pragmas fail to execute.
30    pub async fn new(database_path: &Path) -> Result<Self> {
31        let db_url = if database_path.to_str() == Some(":memory:") {
32            "sqlite::memory:".to_string()
33        } else {
34            // Ensure parent directory exists so SQLite can create the file.
35            if let Some(parent) = database_path.parent() {
36                if !parent.as_os_str().is_empty() && !parent.exists() {
37                    std::fs::create_dir_all(parent).map_err(|e| {
38                        error!("Failed to create analytics database directory: {}", e);
39                        AnalyticsError::Database(sqlx::Error::Io(e))
40                    })?;
41                }
42            }
43            format!("sqlite://{}?mode=rwc", database_path.display())
44        };
45
46        info!("Connecting to analytics database: {}", db_url);
47
48        let pool =
49            SqlitePoolOptions::new()
50                .max_connections(10)
51                .connect(&db_url)
52                .await
53                .map_err(|e| {
54                    error!("Failed to connect to analytics database: {}", e);
55                    AnalyticsError::Database(e)
56                })?;
57
58        // Enable WAL mode for better concurrency
59        sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
60
61        // Enable foreign keys
62        sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
63
64        Ok(Self { pool })
65    }
66
67    /// Run database migrations
68    ///
69    /// Migrations use `IF NOT EXISTS` guards so they are safe to re-run on
70    /// server restart against an existing database.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if any migration SQL fails to execute (other than
75    /// benign "already exists" errors which are logged as warnings).
76    pub async fn run_migrations(&self) -> Result<()> {
77        info!("Running analytics database migrations");
78
79        Self::run_migration_file(
80            &self.pool,
81            "001_analytics_schema",
82            include_str!("../migrations/001_analytics_schema.sql"),
83        )
84        .await?;
85        Self::run_migration_file(
86            &self.pool,
87            "003_coverage_metrics",
88            include_str!("../migrations/003_coverage_metrics.sql"),
89        )
90        .await?;
91        Self::run_migration_file(
92            &self.pool,
93            "002_pillar_usage",
94            include_str!("../migrations/002_pillar_usage.sql"),
95        )
96        .await?;
97
98        info!("Analytics database migrations completed successfully");
99        Ok(())
100    }
101
102    /// Execute a single migration file, treating "already exists" errors as
103    /// warnings rather than hard failures.  This provides defence-in-depth on
104    /// top of the `IF NOT EXISTS` guards in the SQL itself.
105    async fn run_migration_file(pool: &Pool<Sqlite>, name: &str, sql: &str) -> Result<()> {
106        debug!("Running migration: {}", name);
107        let mut conn = pool.acquire().await?;
108        let mut stream = conn.execute_many(sql);
109
110        while let Some(result) = stream.try_next().await.transpose() {
111            match result {
112                Ok(_) => {}
113                Err(e) => {
114                    let msg = e.to_string();
115                    if msg.contains("already exists") {
116                        debug!(
117                            "Migration {}: object already exists (safe to ignore): {}",
118                            name, msg
119                        );
120                    } else {
121                        error!("Migration {} error: {}", name, e);
122                        return Err(AnalyticsError::Migration(format!(
123                            "Failed to execute migration {name}: {e}"
124                        )));
125                    }
126                }
127            }
128        }
129        Ok(())
130    }
131
132    /// Get a reference to the database pool
133    #[must_use]
134    pub const fn pool(&self) -> &SqlitePool {
135        &self.pool
136    }
137
138    // ========================================================================
139    // Insert Operations
140    // ========================================================================
141
142    /// Insert a minute-level metrics aggregate
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the database insert fails.
147    pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
148        let result = sqlx::query(
149            "INSERT INTO metrics_aggregates_minute (
150                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
151                request_count, error_count, latency_sum, latency_min, latency_max,
152                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
153            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
154        )
155        .bind(agg.timestamp)
156        .bind(&agg.protocol)
157        .bind(&agg.method)
158        .bind(&agg.endpoint)
159        .bind(agg.status_code)
160        .bind(&agg.workspace_id)
161        .bind(&agg.environment)
162        .bind(agg.request_count)
163        .bind(agg.error_count)
164        .bind(agg.latency_sum)
165        .bind(agg.latency_min)
166        .bind(agg.latency_max)
167        .bind(agg.latency_p50)
168        .bind(agg.latency_p95)
169        .bind(agg.latency_p99)
170        .bind(agg.bytes_sent)
171        .bind(agg.bytes_received)
172        .bind(agg.active_connections)
173        .execute(&self.pool)
174        .await?;
175
176        Ok(result.last_insert_rowid())
177    }
178
179    /// Insert multiple minute-level aggregates in a batch
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if any database insert fails; the transaction is rolled back.
184    pub async fn insert_minute_aggregates_batch(
185        &self,
186        aggregates: &[MetricsAggregate],
187    ) -> Result<()> {
188        if aggregates.is_empty() {
189            return Ok(());
190        }
191
192        let mut tx = self.pool.begin().await?;
193
194        for agg in aggregates {
195            sqlx::query(
196                r"
197                INSERT INTO metrics_aggregates_minute (
198                    timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
199                    request_count, error_count, latency_sum, latency_min, latency_max,
200                    latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
201                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
202                ",
203            )
204            .bind(agg.timestamp)
205            .bind(&agg.protocol)
206            .bind(&agg.method)
207            .bind(&agg.endpoint)
208            .bind(agg.status_code)
209            .bind(&agg.workspace_id)
210            .bind(&agg.environment)
211            .bind(agg.request_count)
212            .bind(agg.error_count)
213            .bind(agg.latency_sum)
214            .bind(agg.latency_min)
215            .bind(agg.latency_max)
216            .bind(agg.latency_p50)
217            .bind(agg.latency_p95)
218            .bind(agg.latency_p99)
219            .bind(agg.bytes_sent)
220            .bind(agg.bytes_received)
221            .bind(agg.active_connections)
222            .execute(&mut *tx)
223            .await?;
224        }
225
226        tx.commit().await?;
227        debug!("Inserted {} minute aggregates", aggregates.len());
228        Ok(())
229    }
230
231    /// Insert an hour-level metrics aggregate
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if the database insert fails.
236    pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
237        let result = sqlx::query(
238            r"
239            INSERT INTO metrics_aggregates_hour (
240                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
241                request_count, error_count, latency_sum, latency_min, latency_max,
242                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
243                active_connections_avg, active_connections_max
244            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
245            ",
246        )
247        .bind(agg.timestamp)
248        .bind(&agg.protocol)
249        .bind(&agg.method)
250        .bind(&agg.endpoint)
251        .bind(agg.status_code)
252        .bind(&agg.workspace_id)
253        .bind(&agg.environment)
254        .bind(agg.request_count)
255        .bind(agg.error_count)
256        .bind(agg.latency_sum)
257        .bind(agg.latency_min)
258        .bind(agg.latency_max)
259        .bind(agg.latency_p50)
260        .bind(agg.latency_p95)
261        .bind(agg.latency_p99)
262        .bind(agg.bytes_sent)
263        .bind(agg.bytes_received)
264        .bind(agg.active_connections_avg)
265        .bind(agg.active_connections_max)
266        .execute(&self.pool)
267        .await?;
268
269        Ok(result.last_insert_rowid())
270    }
271
272    /// Insert a day-level metrics aggregate
273    ///
274    /// # Errors
275    ///
276    /// Returns an error if the database insert fails.
277    pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
278        let result = sqlx::query(
279            r"
280            INSERT INTO metrics_aggregates_day (
281                date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
282                request_count, error_count, latency_sum, latency_min, latency_max,
283                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
284                active_connections_avg, active_connections_max, unique_clients, peak_hour
285            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
286            ",
287        )
288        .bind(&agg.date)
289        .bind(agg.timestamp)
290        .bind(&agg.protocol)
291        .bind(&agg.method)
292        .bind(&agg.endpoint)
293        .bind(agg.status_code)
294        .bind(&agg.workspace_id)
295        .bind(&agg.environment)
296        .bind(agg.request_count)
297        .bind(agg.error_count)
298        .bind(agg.latency_sum)
299        .bind(agg.latency_min)
300        .bind(agg.latency_max)
301        .bind(agg.latency_p50)
302        .bind(agg.latency_p95)
303        .bind(agg.latency_p99)
304        .bind(agg.bytes_sent)
305        .bind(agg.bytes_received)
306        .bind(agg.active_connections_avg)
307        .bind(agg.active_connections_max)
308        .bind(agg.unique_clients)
309        .bind(agg.peak_hour)
310        .execute(&self.pool)
311        .await?;
312
313        Ok(result.last_insert_rowid())
314    }
315
316    /// Insert or update endpoint statistics
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the database upsert fails.
321    pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
322        sqlx::query(
323            r"
324            INSERT INTO endpoint_stats (
325                endpoint, protocol, method, workspace_id, environment,
326                total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
327                p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
328                first_seen, last_seen
329            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
330            ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
331            DO UPDATE SET
332                total_requests = total_requests + excluded.total_requests,
333                total_errors = total_errors + excluded.total_errors,
334                avg_latency_ms = excluded.avg_latency_ms,
335                min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
336                max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
337                p95_latency_ms = excluded.p95_latency_ms,
338                status_codes = excluded.status_codes,
339                total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
340                total_bytes_received = total_bytes_received + excluded.total_bytes_received,
341                last_seen = excluded.last_seen,
342                updated_at = strftime('%s', 'now')
343            ",
344        )
345        .bind(&stats.endpoint)
346        .bind(&stats.protocol)
347        .bind(&stats.method)
348        .bind(&stats.workspace_id)
349        .bind(&stats.environment)
350        .bind(stats.total_requests)
351        .bind(stats.total_errors)
352        .bind(stats.avg_latency_ms)
353        .bind(stats.min_latency_ms)
354        .bind(stats.max_latency_ms)
355        .bind(stats.p95_latency_ms)
356        .bind(&stats.status_codes)
357        .bind(stats.total_bytes_sent)
358        .bind(stats.total_bytes_received)
359        .bind(stats.first_seen)
360        .bind(stats.last_seen)
361        .execute(&self.pool)
362        .await?;
363
364        Ok(())
365    }
366
367    /// Insert an error event
368    ///
369    /// # Errors
370    ///
371    /// Returns an error if the database insert fails.
372    pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
373        let result = sqlx::query(
374            r"
375            INSERT INTO error_events (
376                timestamp, protocol, method, endpoint, status_code,
377                error_type, error_message, error_category,
378                request_id, trace_id, span_id,
379                client_ip, user_agent, workspace_id, environment, metadata
380            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
381            ",
382        )
383        .bind(error.timestamp)
384        .bind(&error.protocol)
385        .bind(&error.method)
386        .bind(&error.endpoint)
387        .bind(error.status_code)
388        .bind(&error.error_type)
389        .bind(&error.error_message)
390        .bind(&error.error_category)
391        .bind(&error.request_id)
392        .bind(&error.trace_id)
393        .bind(&error.span_id)
394        .bind(&error.client_ip)
395        .bind(&error.user_agent)
396        .bind(&error.workspace_id)
397        .bind(&error.environment)
398        .bind(&error.metadata)
399        .execute(&self.pool)
400        .await?;
401
402        Ok(result.last_insert_rowid())
403    }
404
405    /// Insert a traffic pattern
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the database upsert fails.
410    pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
411        sqlx::query(
412            r"
413            INSERT INTO traffic_patterns (
414                date, hour, day_of_week, protocol, workspace_id, environment,
415                request_count, error_count, avg_latency_ms, unique_clients
416            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
417            ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
418            DO UPDATE SET
419                request_count = request_count + excluded.request_count,
420                error_count = error_count + excluded.error_count,
421                avg_latency_ms = excluded.avg_latency_ms,
422                unique_clients = excluded.unique_clients
423            ",
424        )
425        .bind(&pattern.date)
426        .bind(pattern.hour)
427        .bind(pattern.day_of_week)
428        .bind(&pattern.protocol)
429        .bind(&pattern.workspace_id)
430        .bind(&pattern.environment)
431        .bind(pattern.request_count)
432        .bind(pattern.error_count)
433        .bind(pattern.avg_latency_ms)
434        .bind(pattern.unique_clients)
435        .execute(&self.pool)
436        .await?;
437
438        Ok(())
439    }
440
441    /// Insert an analytics snapshot
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the database insert fails.
446    pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
447        let result = sqlx::query(
448            r"
449            INSERT INTO analytics_snapshots (
450                timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
451                active_connections, protocol_stats, top_endpoints,
452                memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
453                workspace_id, environment
454            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
455            ",
456        )
457        .bind(snapshot.timestamp)
458        .bind(&snapshot.snapshot_type)
459        .bind(snapshot.total_requests)
460        .bind(snapshot.total_errors)
461        .bind(snapshot.avg_latency_ms)
462        .bind(snapshot.active_connections)
463        .bind(&snapshot.protocol_stats)
464        .bind(&snapshot.top_endpoints)
465        .bind(snapshot.memory_usage_bytes)
466        .bind(snapshot.cpu_usage_percent)
467        .bind(snapshot.thread_count)
468        .bind(snapshot.uptime_seconds)
469        .bind(&snapshot.workspace_id)
470        .bind(&snapshot.environment)
471        .execute(&self.pool)
472        .await?;
473
474        Ok(result.last_insert_rowid())
475    }
476
477    // ========================================================================
478    // Query Operations
479    // ========================================================================
480
481    /// Get minute aggregates for a time range
482    ///
483    /// # Errors
484    ///
485    /// Returns an error if the database query fails.
486    pub async fn get_minute_aggregates(
487        &self,
488        filter: &AnalyticsFilter,
489    ) -> Result<Vec<MetricsAggregate>> {
490        let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
491
492        if filter.start_time.is_some() {
493            query.push_str(" AND timestamp >= ?");
494        }
495        if filter.end_time.is_some() {
496            query.push_str(" AND timestamp <= ?");
497        }
498        if filter.protocol.is_some() {
499            query.push_str(" AND protocol = ?");
500        }
501        if filter.endpoint.is_some() {
502            query.push_str(" AND endpoint = ?");
503        }
504        if filter.method.is_some() {
505            query.push_str(" AND method = ?");
506        }
507        if filter.status_code.is_some() {
508            query.push_str(" AND status_code = ?");
509        }
510        if filter.workspace_id.is_some() {
511            query.push_str(" AND workspace_id = ?");
512        }
513        if filter.environment.is_some() {
514            query.push_str(" AND environment = ?");
515        }
516
517        query.push_str(" ORDER BY timestamp DESC");
518
519        if filter.limit.is_some() {
520            query.push_str(" LIMIT ?");
521        }
522
523        // Build the query with bound parameters
524        let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
525
526        if let Some(start) = filter.start_time {
527            sql_query = sql_query.bind(start);
528        }
529        if let Some(end) = filter.end_time {
530            sql_query = sql_query.bind(end);
531        }
532        if let Some(ref protocol) = filter.protocol {
533            sql_query = sql_query.bind(protocol);
534        }
535        if let Some(ref endpoint) = filter.endpoint {
536            sql_query = sql_query.bind(endpoint);
537        }
538        if let Some(ref method) = filter.method {
539            sql_query = sql_query.bind(method);
540        }
541        if let Some(status) = filter.status_code {
542            sql_query = sql_query.bind(status);
543        }
544        if let Some(ref workspace) = filter.workspace_id {
545            sql_query = sql_query.bind(workspace);
546        }
547        if let Some(ref env) = filter.environment {
548            sql_query = sql_query.bind(env);
549        }
550        if let Some(limit) = filter.limit {
551            sql_query = sql_query.bind(limit);
552        }
553
554        let results = sql_query.fetch_all(&self.pool).await?;
555
556        Ok(results)
557    }
558
559    /// Get hour-level aggregates
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if the database query fails.
564    pub async fn get_hour_aggregates(
565        &self,
566        filter: &AnalyticsFilter,
567    ) -> Result<Vec<HourMetricsAggregate>> {
568        let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
569
570        if filter.start_time.is_some() {
571            query.push_str(" AND timestamp >= ?");
572        }
573        if filter.end_time.is_some() {
574            query.push_str(" AND timestamp <= ?");
575        }
576        if filter.protocol.is_some() {
577            query.push_str(" AND protocol = ?");
578        }
579        if filter.endpoint.is_some() {
580            query.push_str(" AND endpoint = ?");
581        }
582        if filter.method.is_some() {
583            query.push_str(" AND method = ?");
584        }
585        if filter.status_code.is_some() {
586            query.push_str(" AND status_code = ?");
587        }
588        if filter.workspace_id.is_some() {
589            query.push_str(" AND workspace_id = ?");
590        }
591        if filter.environment.is_some() {
592            query.push_str(" AND environment = ?");
593        }
594
595        query.push_str(" ORDER BY timestamp DESC");
596
597        if filter.limit.is_some() {
598            query.push_str(" LIMIT ?");
599        }
600
601        // Build the query with bound parameters
602        let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
603
604        if let Some(start) = filter.start_time {
605            sql_query = sql_query.bind(start);
606        }
607        if let Some(end) = filter.end_time {
608            sql_query = sql_query.bind(end);
609        }
610        if let Some(ref protocol) = filter.protocol {
611            sql_query = sql_query.bind(protocol);
612        }
613        if let Some(ref endpoint) = filter.endpoint {
614            sql_query = sql_query.bind(endpoint);
615        }
616        if let Some(ref method) = filter.method {
617            sql_query = sql_query.bind(method);
618        }
619        if let Some(status) = filter.status_code {
620            sql_query = sql_query.bind(status);
621        }
622        if let Some(ref workspace) = filter.workspace_id {
623            sql_query = sql_query.bind(workspace);
624        }
625        if let Some(ref env) = filter.environment {
626            sql_query = sql_query.bind(env);
627        }
628        if let Some(limit) = filter.limit {
629            sql_query = sql_query.bind(limit);
630        }
631
632        let results = sql_query.fetch_all(&self.pool).await?;
633
634        Ok(results)
635    }
636
637    /// Get top endpoints by request count
638    ///
639    /// # Errors
640    ///
641    /// Returns an error if the database query fails.
642    pub async fn get_top_endpoints(
643        &self,
644        limit: i64,
645        workspace_id: Option<&str>,
646    ) -> Result<Vec<EndpointStats>> {
647        let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
648
649        if workspace_id.is_some() {
650            query.push_str(" AND workspace_id = ?");
651        }
652
653        query.push_str(" ORDER BY total_requests DESC LIMIT ?");
654
655        let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
656
657        if let Some(workspace) = workspace_id {
658            sql_query = sql_query.bind(workspace);
659        }
660
661        sql_query = sql_query.bind(limit);
662
663        let results = sql_query.fetch_all(&self.pool).await?;
664
665        Ok(results)
666    }
667
668    /// Get recent error events
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if the database query fails.
673    pub async fn get_recent_errors(
674        &self,
675        limit: i64,
676        filter: &AnalyticsFilter,
677    ) -> Result<Vec<ErrorEvent>> {
678        let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
679
680        if filter.start_time.is_some() {
681            query.push_str(" AND timestamp >= ?");
682        }
683        if filter.end_time.is_some() {
684            query.push_str(" AND timestamp <= ?");
685        }
686        if filter.endpoint.is_some() {
687            query.push_str(" AND endpoint = ?");
688        }
689        if filter.workspace_id.is_some() {
690            query.push_str(" AND workspace_id = ?");
691        }
692
693        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
694
695        let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
696
697        if let Some(start) = filter.start_time {
698            sql_query = sql_query.bind(start);
699        }
700        if let Some(end) = filter.end_time {
701            sql_query = sql_query.bind(end);
702        }
703        if let Some(ref endpoint) = filter.endpoint {
704            sql_query = sql_query.bind(endpoint);
705        }
706        if let Some(ref workspace) = filter.workspace_id {
707            sql_query = sql_query.bind(workspace);
708        }
709
710        sql_query = sql_query.bind(limit);
711
712        let results = sql_query.fetch_all(&self.pool).await?;
713
714        Ok(results)
715    }
716
717    /// Get traffic patterns for heatmap
718    ///
719    /// # Errors
720    ///
721    /// Returns an error if the database query fails.
722    pub async fn get_traffic_patterns(
723        &self,
724        days: i64,
725        workspace_id: Option<&str>,
726    ) -> Result<Vec<TrafficPattern>> {
727        let start_date = chrono::Utc::now() - chrono::Duration::days(days);
728        let start_date_str = start_date.format("%Y-%m-%d").to_string();
729
730        let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
731
732        if workspace_id.is_some() {
733            query.push_str(" AND workspace_id = ?");
734        }
735
736        query.push_str(" ORDER BY date ASC, hour ASC");
737
738        let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
739
740        if let Some(workspace) = workspace_id {
741            query_builder = query_builder.bind(workspace);
742        }
743
744        let results = query_builder.fetch_all(&self.pool).await?;
745
746        Ok(results)
747    }
748
749    // ========================================================================
750    // Cleanup Operations
751    // ========================================================================
752
753    /// Delete old minute aggregates
754    ///
755    /// # Errors
756    ///
757    /// Returns an error if the database delete fails.
758    pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
759        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
760
761        let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
762            .bind(cutoff)
763            .execute(&self.pool)
764            .await?;
765
766        info!(
767            "Cleaned up {} minute aggregates older than {} days",
768            result.rows_affected(),
769            days
770        );
771        Ok(result.rows_affected())
772    }
773
774    /// Delete old hour aggregates
775    ///
776    /// # Errors
777    ///
778    /// Returns an error if the database delete fails.
779    pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
780        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
781
782        let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
783            .bind(cutoff)
784            .execute(&self.pool)
785            .await?;
786
787        info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
788        Ok(result.rows_affected())
789    }
790
791    /// Delete old error events
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if the database delete fails.
796    pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
797        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
798
799        let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
800            .bind(cutoff)
801            .execute(&self.pool)
802            .await?;
803
804        info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
805        Ok(result.rows_affected())
806    }
807
808    /// Vacuum the database to reclaim space
809    ///
810    /// # Errors
811    ///
812    /// Returns an error if the VACUUM command fails.
813    pub async fn vacuum(&self) -> Result<()> {
814        info!("Running VACUUM on analytics database");
815        sqlx::query("VACUUM").execute(&self.pool).await?;
816        info!("VACUUM completed");
817        Ok(())
818    }
819}
820
821// ============================================================================
822// Coverage Metrics Operations (MockOps)
823// ============================================================================
824
825impl AnalyticsDatabase {
826    /// Record scenario usage
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if the database operation fails.
831    pub async fn record_scenario_usage(
832        &self,
833        scenario_id: &str,
834        workspace_id: Option<&str>,
835        org_id: Option<&str>,
836    ) -> Result<()> {
837        let now = chrono::Utc::now().timestamp();
838
839        // SQLite doesn't support ON CONFLICT with multiple columns easily, so use INSERT OR REPLACE
840        // First try to update existing record
841        let rows_affected = sqlx::query(
842            "UPDATE scenario_usage_metrics
843             SET usage_count = usage_count + 1,
844                 last_used_at = ?,
845                 updated_at = ?
846             WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
847               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
848        )
849        .bind(now)
850        .bind(now)
851        .bind(scenario_id)
852        .bind(workspace_id)
853        .bind(workspace_id)
854        .bind(org_id)
855        .bind(org_id)
856        .execute(&self.pool)
857        .await?;
858
859        // If no rows were updated, insert a new record
860        if rows_affected.rows_affected() == 0 {
861            sqlx::query(
862                "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
863                 VALUES (?, ?, ?, 1, ?, ?, ?)"
864            )
865            .bind(scenario_id)
866            .bind(workspace_id)
867            .bind(org_id)
868            .bind(now)
869            .bind(now)
870            .bind(now)
871            .execute(&self.pool)
872            .await?;
873        }
874
875        Ok(())
876    }
877
878    /// Record persona CI hit
879    ///
880    /// # Errors
881    ///
882    /// Returns an error if the database insert fails.
883    pub async fn record_persona_ci_hit(
884        &self,
885        persona_id: &str,
886        workspace_id: Option<&str>,
887        org_id: Option<&str>,
888        ci_run_id: Option<&str>,
889    ) -> Result<()> {
890        let now = chrono::Utc::now().timestamp();
891
892        sqlx::query(
893            "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
894             VALUES (?, ?, ?, ?, 1, ?)"
895        )
896        .bind(persona_id)
897        .bind(workspace_id)
898        .bind(org_id)
899        .bind(ci_run_id)
900        .bind(now)
901        .execute(&self.pool)
902        .await?;
903
904        Ok(())
905    }
906
907    /// Record endpoint test coverage
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the database operation fails.
912    #[allow(clippy::too_many_arguments)]
913    pub async fn record_endpoint_coverage(
914        &self,
915        endpoint: &str,
916        method: Option<&str>,
917        protocol: &str,
918        workspace_id: Option<&str>,
919        org_id: Option<&str>,
920        coverage_percentage: Option<f64>,
921    ) -> Result<()> {
922        let now = chrono::Utc::now().timestamp();
923
924        // Try to update existing record
925        let rows_affected = sqlx::query(
926            "UPDATE endpoint_coverage
927             SET test_count = test_count + 1,
928                 last_tested_at = ?,
929                 coverage_percentage = COALESCE(?, coverage_percentage),
930                 updated_at = ?
931             WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
932               AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
933               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
934        )
935        .bind(now)
936        .bind(coverage_percentage)
937        .bind(now)
938        .bind(endpoint)
939        .bind(method)
940        .bind(method)
941        .bind(protocol)
942        .bind(workspace_id)
943        .bind(workspace_id)
944        .bind(org_id)
945        .bind(org_id)
946        .execute(&self.pool)
947        .await?;
948
949        // If no rows were updated, insert a new record
950        if rows_affected.rows_affected() == 0 {
951            sqlx::query(
952                "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
953                 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
954            )
955            .bind(endpoint)
956            .bind(method)
957            .bind(protocol)
958            .bind(workspace_id)
959            .bind(org_id)
960            .bind(now)
961            .bind(coverage_percentage)
962            .bind(now)
963            .bind(now)
964            .execute(&self.pool)
965            .await?;
966        }
967
968        Ok(())
969    }
970
971    /// Record reality level staleness
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the database insert fails.
976    #[allow(clippy::too_many_arguments)]
977    pub async fn record_reality_level_staleness(
978        &self,
979        workspace_id: &str,
980        org_id: Option<&str>,
981        endpoint: Option<&str>,
982        method: Option<&str>,
983        protocol: Option<&str>,
984        current_reality_level: Option<&str>,
985        staleness_days: Option<i32>,
986    ) -> Result<()> {
987        let now = chrono::Utc::now().timestamp();
988        let last_updated = Some(staleness_days.map_or(now, |days| now - (i64::from(days) * 86400)));
989
990        sqlx::query(
991            "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
992             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
993             ON CONFLICT DO NOTHING"
994        )
995        .bind(workspace_id)
996        .bind(org_id)
997        .bind(endpoint)
998        .bind(method)
999        .bind(protocol)
1000        .bind(current_reality_level)
1001        .bind(last_updated)
1002        .bind(staleness_days)
1003        .bind(now)
1004        .bind(now)
1005        .execute(&self.pool)
1006        .await?;
1007
1008        Ok(())
1009    }
1010
1011    /// Record drift percentage metrics
1012    ///
1013    /// # Errors
1014    ///
1015    /// Returns an error if the database insert fails.
1016    pub async fn record_drift_percentage(
1017        &self,
1018        workspace_id: &str,
1019        org_id: Option<&str>,
1020        total_mocks: i64,
1021        drifting_mocks: i64,
1022    ) -> Result<()> {
1023        let now = chrono::Utc::now().timestamp();
1024        #[allow(clippy::cast_precision_loss)]
1025        let drift_percentage = if total_mocks > 0 {
1026            (drifting_mocks as f64 / total_mocks as f64) * 100.0
1027        } else {
1028            0.0
1029        };
1030
1031        sqlx::query(
1032            "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
1033             VALUES (?, ?, ?, ?, ?, ?)"
1034        )
1035        .bind(workspace_id)
1036        .bind(org_id)
1037        .bind(total_mocks)
1038        .bind(drifting_mocks)
1039        .bind(drift_percentage)
1040        .bind(now)
1041        .execute(&self.pool)
1042        .await?;
1043
1044        Ok(())
1045    }
1046
1047    /// Get scenario usage metrics
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if the database query fails.
1052    pub async fn get_scenario_usage(
1053        &self,
1054        workspace_id: Option<&str>,
1055        org_id: Option<&str>,
1056        limit: Option<i64>,
1057    ) -> Result<Vec<ScenarioUsageMetrics>> {
1058        let limit = limit.unwrap_or(100);
1059        let mut query = String::from(
1060            "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
1061             FROM scenario_usage_metrics
1062             WHERE 1=1"
1063        );
1064
1065        if workspace_id.is_some() {
1066            query.push_str(" AND workspace_id = ?");
1067        }
1068        if org_id.is_some() {
1069            query.push_str(" AND org_id = ?");
1070        }
1071        query.push_str(" ORDER BY usage_count DESC LIMIT ?");
1072
1073        let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
1074        if let Some(ws_id) = workspace_id {
1075            q = q.bind(ws_id);
1076        }
1077        if let Some(o_id) = org_id {
1078            q = q.bind(o_id);
1079        }
1080        q = q.bind(limit);
1081
1082        let results = q.fetch_all(&self.pool).await?;
1083        Ok(results)
1084    }
1085
1086    /// Get persona CI hits
1087    ///
1088    /// # Errors
1089    ///
1090    /// Returns an error if the database query fails.
1091    pub async fn get_persona_ci_hits(
1092        &self,
1093        workspace_id: Option<&str>,
1094        org_id: Option<&str>,
1095        limit: Option<i64>,
1096    ) -> Result<Vec<PersonaCIHit>> {
1097        let limit = limit.unwrap_or(100);
1098        let mut query = String::from(
1099            "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1100             FROM persona_ci_hits
1101             WHERE 1=1",
1102        );
1103
1104        if workspace_id.is_some() {
1105            query.push_str(" AND workspace_id = ?");
1106        }
1107        if org_id.is_some() {
1108            query.push_str(" AND org_id = ?");
1109        }
1110        query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1111
1112        let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1113        if let Some(ws_id) = workspace_id {
1114            q = q.bind(ws_id);
1115        }
1116        if let Some(o_id) = org_id {
1117            q = q.bind(o_id);
1118        }
1119        q = q.bind(limit);
1120
1121        let results = q.fetch_all(&self.pool).await?;
1122        Ok(results)
1123    }
1124
1125    /// Get endpoint coverage
1126    ///
1127    /// # Errors
1128    ///
1129    /// Returns an error if the database query fails.
1130    pub async fn get_endpoint_coverage(
1131        &self,
1132        workspace_id: Option<&str>,
1133        org_id: Option<&str>,
1134        min_coverage: Option<f64>,
1135    ) -> Result<Vec<EndpointCoverage>> {
1136        let mut query = String::from(
1137            "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1138             FROM endpoint_coverage
1139             WHERE 1=1"
1140        );
1141
1142        if workspace_id.is_some() {
1143            query.push_str(" AND workspace_id = ?");
1144        }
1145        if org_id.is_some() {
1146            query.push_str(" AND org_id = ?");
1147        }
1148        if min_coverage.is_some() {
1149            query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1150        }
1151        query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1152
1153        let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1154        if let Some(ws_id) = workspace_id {
1155            q = q.bind(ws_id);
1156        }
1157        if let Some(o_id) = org_id {
1158            q = q.bind(o_id);
1159        }
1160        if let Some(min_cov) = min_coverage {
1161            q = q.bind(min_cov);
1162        }
1163
1164        let results = q.fetch_all(&self.pool).await?;
1165        Ok(results)
1166    }
1167
1168    /// Get reality level staleness
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if the database query fails.
1173    pub async fn get_reality_level_staleness(
1174        &self,
1175        workspace_id: Option<&str>,
1176        org_id: Option<&str>,
1177        max_staleness_days: Option<i32>,
1178    ) -> Result<Vec<RealityLevelStaleness>> {
1179        let mut query = String::from(
1180            "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1181             FROM reality_level_staleness
1182             WHERE 1=1"
1183        );
1184
1185        if workspace_id.is_some() {
1186            query.push_str(" AND workspace_id = ?");
1187        }
1188        if org_id.is_some() {
1189            query.push_str(" AND org_id = ?");
1190        }
1191        if max_staleness_days.is_some() {
1192            query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1193        }
1194        query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1195
1196        let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1197        if let Some(ws_id) = workspace_id {
1198            q = q.bind(ws_id);
1199        }
1200        if let Some(o_id) = org_id {
1201            q = q.bind(o_id);
1202        }
1203        if let Some(max_days) = max_staleness_days {
1204            q = q.bind(max_days);
1205        }
1206
1207        let results = q.fetch_all(&self.pool).await?;
1208        Ok(results)
1209    }
1210
1211    /// Get drift percentage metrics
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if the database query fails.
1216    pub async fn get_drift_percentage(
1217        &self,
1218        workspace_id: Option<&str>,
1219        org_id: Option<&str>,
1220        limit: Option<i64>,
1221    ) -> Result<Vec<DriftPercentageMetrics>> {
1222        let limit = limit.unwrap_or(100);
1223        let mut query = String::from(
1224            "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1225             FROM drift_percentage_metrics
1226             WHERE 1=1"
1227        );
1228
1229        if workspace_id.is_some() {
1230            query.push_str(" AND workspace_id = ?");
1231        }
1232        if org_id.is_some() {
1233            query.push_str(" AND org_id = ?");
1234        }
1235        query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1236
1237        let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1238        if let Some(ws_id) = workspace_id {
1239            q = q.bind(ws_id);
1240        }
1241        if let Some(o_id) = org_id {
1242            q = q.bind(o_id);
1243        }
1244        q = q.bind(limit);
1245
1246        let results = q.fetch_all(&self.pool).await?;
1247        Ok(results)
1248    }
1249}
1250
1251#[cfg(test)]
1252mod tests {
1253    use super::*;
1254
1255    #[tokio::test]
1256    async fn test_database_creation() {
1257        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1258        db.run_migrations().await.unwrap();
1259    }
1260
1261    #[tokio::test]
1262    async fn test_insert_minute_aggregate() {
1263        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1264        db.run_migrations().await.unwrap();
1265
1266        let agg = MetricsAggregate {
1267            id: None,
1268            timestamp: chrono::Utc::now().timestamp(),
1269            protocol: "HTTP".to_string(),
1270            method: Some("GET".to_string()),
1271            endpoint: Some("/api/test".to_string()),
1272            status_code: Some(200),
1273            workspace_id: None,
1274            environment: None,
1275            request_count: 100,
1276            error_count: 5,
1277            latency_sum: 500.0,
1278            latency_min: Some(10.0),
1279            latency_max: Some(100.0),
1280            latency_p50: Some(45.0),
1281            latency_p95: Some(95.0),
1282            latency_p99: Some(99.0),
1283            bytes_sent: 10_000,
1284            bytes_received: 5_000,
1285            active_connections: Some(10),
1286            created_at: None,
1287        };
1288
1289        let id = db.insert_minute_aggregate(&agg).await.unwrap();
1290        assert!(id > 0);
1291    }
1292}