Skip to main content

mockforge_analytics/
database.rs

1//! Database layer for analytics storage
2
3use crate::error::{AnalyticsError, Result};
4use crate::models::{
5    AnalyticsFilter, AnalyticsSnapshot, DayMetricsAggregate, DriftPercentageMetrics,
6    EndpointCoverage, EndpointStats, ErrorEvent, HourMetricsAggregate, MetricsAggregate,
7    PersonaCIHit, RealityLevelStaleness, ScenarioUsageMetrics, TrafficPattern,
8};
9use futures::TryStreamExt;
10use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
11use std::path::Path;
12use tracing::{debug, error, info};
13
14/// Analytics database manager
15#[derive(Clone)]
16pub struct AnalyticsDatabase {
17    pool: Pool<Sqlite>,
18}
19
20impl AnalyticsDatabase {
21    /// Create a new analytics database connection
22    ///
23    /// # Arguments
24    /// * `database_path` - Path to the `SQLite` database file (or ":memory:" for in-memory)
25    ///
26    /// # Errors
27    ///
28    /// Returns an error if the database connection cannot be established or
29    /// `SQLite` pragmas fail to execute.
30    pub async fn new(database_path: &Path) -> Result<Self> {
31        let db_url = if database_path.to_str() == Some(":memory:") {
32            "sqlite::memory:".to_string()
33        } else {
34            // Ensure parent directory exists so SQLite can create the file.
35            if let Some(parent) = database_path.parent() {
36                if !parent.as_os_str().is_empty() && !parent.exists() {
37                    std::fs::create_dir_all(parent).map_err(|e| {
38                        error!("Failed to create analytics database directory: {}", e);
39                        AnalyticsError::Database(sqlx::Error::Io(e))
40                    })?;
41                }
42            }
43            format!("sqlite://{}?mode=rwc", database_path.display())
44        };
45
46        info!("Connecting to analytics database: {}", db_url);
47
48        let pool =
49            SqlitePoolOptions::new()
50                .max_connections(10)
51                .connect(&db_url)
52                .await
53                .map_err(|e| {
54                    error!("Failed to connect to analytics database: {}", e);
55                    AnalyticsError::Database(e)
56                })?;
57
58        // Enable WAL mode for better concurrency
59        sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
60
61        // Enable foreign keys
62        sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
63
64        Ok(Self { pool })
65    }
66
67    /// Run database migrations
68    ///
69    /// # Errors
70    ///
71    /// Returns an error if any migration SQL fails to execute.
72    pub async fn run_migrations(&self) -> Result<()> {
73        info!("Running analytics database migrations");
74
75        // Run initial schema migration
76        let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
77        let mut conn = self.pool.acquire().await?;
78        let mut stream = conn.execute_many(migration_sql);
79
80        while stream
81            .try_next()
82            .await
83            .map_err(|e| {
84                error!("Migration error: {}", e);
85                AnalyticsError::Migration(format!("Failed to execute migration: {e}"))
86            })?
87            .is_some()
88        {}
89
90        // Run coverage metrics migration
91        let coverage_migration_sql = include_str!("../migrations/003_coverage_metrics.sql");
92        let mut conn = self.pool.acquire().await?;
93        let mut stream = conn.execute_many(coverage_migration_sql);
94
95        while stream
96            .try_next()
97            .await
98            .map_err(|e| {
99                error!("Coverage metrics migration error: {}", e);
100                AnalyticsError::Migration(format!(
101                    "Failed to execute coverage metrics migration: {e}"
102                ))
103            })?
104            .is_some()
105        {}
106
107        // Run pillar usage migration
108        let pillar_usage_migration_sql = include_str!("../migrations/002_pillar_usage.sql");
109        let mut conn = self.pool.acquire().await?;
110        let mut stream = conn.execute_many(pillar_usage_migration_sql);
111
112        while stream
113            .try_next()
114            .await
115            .map_err(|e| {
116                error!("Pillar usage migration error: {}", e);
117                AnalyticsError::Migration(format!("Failed to execute pillar usage migration: {e}"))
118            })?
119            .is_some()
120        {}
121
122        info!("Analytics database migrations completed successfully");
123        Ok(())
124    }
125
126    /// Get a reference to the database pool
127    #[must_use]
128    pub const fn pool(&self) -> &SqlitePool {
129        &self.pool
130    }
131
132    // ========================================================================
133    // Insert Operations
134    // ========================================================================
135
136    /// Insert a minute-level metrics aggregate
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the database insert fails.
141    pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
142        let result = sqlx::query(
143            "INSERT INTO metrics_aggregates_minute (
144                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
145                request_count, error_count, latency_sum, latency_min, latency_max,
146                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
147            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
148        )
149        .bind(agg.timestamp)
150        .bind(&agg.protocol)
151        .bind(&agg.method)
152        .bind(&agg.endpoint)
153        .bind(agg.status_code)
154        .bind(&agg.workspace_id)
155        .bind(&agg.environment)
156        .bind(agg.request_count)
157        .bind(agg.error_count)
158        .bind(agg.latency_sum)
159        .bind(agg.latency_min)
160        .bind(agg.latency_max)
161        .bind(agg.latency_p50)
162        .bind(agg.latency_p95)
163        .bind(agg.latency_p99)
164        .bind(agg.bytes_sent)
165        .bind(agg.bytes_received)
166        .bind(agg.active_connections)
167        .execute(&self.pool)
168        .await?;
169
170        Ok(result.last_insert_rowid())
171    }
172
173    /// Insert multiple minute-level aggregates in a batch
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if any database insert fails; the transaction is rolled back.
178    pub async fn insert_minute_aggregates_batch(
179        &self,
180        aggregates: &[MetricsAggregate],
181    ) -> Result<()> {
182        if aggregates.is_empty() {
183            return Ok(());
184        }
185
186        let mut tx = self.pool.begin().await?;
187
188        for agg in aggregates {
189            sqlx::query(
190                r"
191                INSERT INTO metrics_aggregates_minute (
192                    timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
193                    request_count, error_count, latency_sum, latency_min, latency_max,
194                    latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
195                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
196                ",
197            )
198            .bind(agg.timestamp)
199            .bind(&agg.protocol)
200            .bind(&agg.method)
201            .bind(&agg.endpoint)
202            .bind(agg.status_code)
203            .bind(&agg.workspace_id)
204            .bind(&agg.environment)
205            .bind(agg.request_count)
206            .bind(agg.error_count)
207            .bind(agg.latency_sum)
208            .bind(agg.latency_min)
209            .bind(agg.latency_max)
210            .bind(agg.latency_p50)
211            .bind(agg.latency_p95)
212            .bind(agg.latency_p99)
213            .bind(agg.bytes_sent)
214            .bind(agg.bytes_received)
215            .bind(agg.active_connections)
216            .execute(&mut *tx)
217            .await?;
218        }
219
220        tx.commit().await?;
221        debug!("Inserted {} minute aggregates", aggregates.len());
222        Ok(())
223    }
224
225    /// Insert an hour-level metrics aggregate
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if the database insert fails.
230    pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
231        let result = sqlx::query(
232            r"
233            INSERT INTO metrics_aggregates_hour (
234                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
235                request_count, error_count, latency_sum, latency_min, latency_max,
236                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
237                active_connections_avg, active_connections_max
238            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
239            ",
240        )
241        .bind(agg.timestamp)
242        .bind(&agg.protocol)
243        .bind(&agg.method)
244        .bind(&agg.endpoint)
245        .bind(agg.status_code)
246        .bind(&agg.workspace_id)
247        .bind(&agg.environment)
248        .bind(agg.request_count)
249        .bind(agg.error_count)
250        .bind(agg.latency_sum)
251        .bind(agg.latency_min)
252        .bind(agg.latency_max)
253        .bind(agg.latency_p50)
254        .bind(agg.latency_p95)
255        .bind(agg.latency_p99)
256        .bind(agg.bytes_sent)
257        .bind(agg.bytes_received)
258        .bind(agg.active_connections_avg)
259        .bind(agg.active_connections_max)
260        .execute(&self.pool)
261        .await?;
262
263        Ok(result.last_insert_rowid())
264    }
265
266    /// Insert a day-level metrics aggregate
267    ///
268    /// # Errors
269    ///
270    /// Returns an error if the database insert fails.
271    pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
272        let result = sqlx::query(
273            r"
274            INSERT INTO metrics_aggregates_day (
275                date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
276                request_count, error_count, latency_sum, latency_min, latency_max,
277                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
278                active_connections_avg, active_connections_max, unique_clients, peak_hour
279            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
280            ",
281        )
282        .bind(&agg.date)
283        .bind(agg.timestamp)
284        .bind(&agg.protocol)
285        .bind(&agg.method)
286        .bind(&agg.endpoint)
287        .bind(agg.status_code)
288        .bind(&agg.workspace_id)
289        .bind(&agg.environment)
290        .bind(agg.request_count)
291        .bind(agg.error_count)
292        .bind(agg.latency_sum)
293        .bind(agg.latency_min)
294        .bind(agg.latency_max)
295        .bind(agg.latency_p50)
296        .bind(agg.latency_p95)
297        .bind(agg.latency_p99)
298        .bind(agg.bytes_sent)
299        .bind(agg.bytes_received)
300        .bind(agg.active_connections_avg)
301        .bind(agg.active_connections_max)
302        .bind(agg.unique_clients)
303        .bind(agg.peak_hour)
304        .execute(&self.pool)
305        .await?;
306
307        Ok(result.last_insert_rowid())
308    }
309
310    /// Insert or update endpoint statistics
311    ///
312    /// # Errors
313    ///
314    /// Returns an error if the database upsert fails.
315    pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
316        sqlx::query(
317            r"
318            INSERT INTO endpoint_stats (
319                endpoint, protocol, method, workspace_id, environment,
320                total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
321                p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
322                first_seen, last_seen
323            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
324            ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
325            DO UPDATE SET
326                total_requests = total_requests + excluded.total_requests,
327                total_errors = total_errors + excluded.total_errors,
328                avg_latency_ms = excluded.avg_latency_ms,
329                min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
330                max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
331                p95_latency_ms = excluded.p95_latency_ms,
332                status_codes = excluded.status_codes,
333                total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
334                total_bytes_received = total_bytes_received + excluded.total_bytes_received,
335                last_seen = excluded.last_seen,
336                updated_at = strftime('%s', 'now')
337            ",
338        )
339        .bind(&stats.endpoint)
340        .bind(&stats.protocol)
341        .bind(&stats.method)
342        .bind(&stats.workspace_id)
343        .bind(&stats.environment)
344        .bind(stats.total_requests)
345        .bind(stats.total_errors)
346        .bind(stats.avg_latency_ms)
347        .bind(stats.min_latency_ms)
348        .bind(stats.max_latency_ms)
349        .bind(stats.p95_latency_ms)
350        .bind(&stats.status_codes)
351        .bind(stats.total_bytes_sent)
352        .bind(stats.total_bytes_received)
353        .bind(stats.first_seen)
354        .bind(stats.last_seen)
355        .execute(&self.pool)
356        .await?;
357
358        Ok(())
359    }
360
361    /// Insert an error event
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if the database insert fails.
366    pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
367        let result = sqlx::query(
368            r"
369            INSERT INTO error_events (
370                timestamp, protocol, method, endpoint, status_code,
371                error_type, error_message, error_category,
372                request_id, trace_id, span_id,
373                client_ip, user_agent, workspace_id, environment, metadata
374            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
375            ",
376        )
377        .bind(error.timestamp)
378        .bind(&error.protocol)
379        .bind(&error.method)
380        .bind(&error.endpoint)
381        .bind(error.status_code)
382        .bind(&error.error_type)
383        .bind(&error.error_message)
384        .bind(&error.error_category)
385        .bind(&error.request_id)
386        .bind(&error.trace_id)
387        .bind(&error.span_id)
388        .bind(&error.client_ip)
389        .bind(&error.user_agent)
390        .bind(&error.workspace_id)
391        .bind(&error.environment)
392        .bind(&error.metadata)
393        .execute(&self.pool)
394        .await?;
395
396        Ok(result.last_insert_rowid())
397    }
398
399    /// Insert a traffic pattern
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the database upsert fails.
404    pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
405        sqlx::query(
406            r"
407            INSERT INTO traffic_patterns (
408                date, hour, day_of_week, protocol, workspace_id, environment,
409                request_count, error_count, avg_latency_ms, unique_clients
410            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
411            ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
412            DO UPDATE SET
413                request_count = request_count + excluded.request_count,
414                error_count = error_count + excluded.error_count,
415                avg_latency_ms = excluded.avg_latency_ms,
416                unique_clients = excluded.unique_clients
417            ",
418        )
419        .bind(&pattern.date)
420        .bind(pattern.hour)
421        .bind(pattern.day_of_week)
422        .bind(&pattern.protocol)
423        .bind(&pattern.workspace_id)
424        .bind(&pattern.environment)
425        .bind(pattern.request_count)
426        .bind(pattern.error_count)
427        .bind(pattern.avg_latency_ms)
428        .bind(pattern.unique_clients)
429        .execute(&self.pool)
430        .await?;
431
432        Ok(())
433    }
434
435    /// Insert an analytics snapshot
436    ///
437    /// # Errors
438    ///
439    /// Returns an error if the database insert fails.
440    pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
441        let result = sqlx::query(
442            r"
443            INSERT INTO analytics_snapshots (
444                timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
445                active_connections, protocol_stats, top_endpoints,
446                memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
447                workspace_id, environment
448            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
449            ",
450        )
451        .bind(snapshot.timestamp)
452        .bind(&snapshot.snapshot_type)
453        .bind(snapshot.total_requests)
454        .bind(snapshot.total_errors)
455        .bind(snapshot.avg_latency_ms)
456        .bind(snapshot.active_connections)
457        .bind(&snapshot.protocol_stats)
458        .bind(&snapshot.top_endpoints)
459        .bind(snapshot.memory_usage_bytes)
460        .bind(snapshot.cpu_usage_percent)
461        .bind(snapshot.thread_count)
462        .bind(snapshot.uptime_seconds)
463        .bind(&snapshot.workspace_id)
464        .bind(&snapshot.environment)
465        .execute(&self.pool)
466        .await?;
467
468        Ok(result.last_insert_rowid())
469    }
470
471    // ========================================================================
472    // Query Operations
473    // ========================================================================
474
475    /// Get minute aggregates for a time range
476    ///
477    /// # Errors
478    ///
479    /// Returns an error if the database query fails.
480    pub async fn get_minute_aggregates(
481        &self,
482        filter: &AnalyticsFilter,
483    ) -> Result<Vec<MetricsAggregate>> {
484        let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
485
486        if filter.start_time.is_some() {
487            query.push_str(" AND timestamp >= ?");
488        }
489        if filter.end_time.is_some() {
490            query.push_str(" AND timestamp <= ?");
491        }
492        if filter.protocol.is_some() {
493            query.push_str(" AND protocol = ?");
494        }
495        if filter.endpoint.is_some() {
496            query.push_str(" AND endpoint = ?");
497        }
498        if filter.method.is_some() {
499            query.push_str(" AND method = ?");
500        }
501        if filter.status_code.is_some() {
502            query.push_str(" AND status_code = ?");
503        }
504        if filter.workspace_id.is_some() {
505            query.push_str(" AND workspace_id = ?");
506        }
507        if filter.environment.is_some() {
508            query.push_str(" AND environment = ?");
509        }
510
511        query.push_str(" ORDER BY timestamp DESC");
512
513        if filter.limit.is_some() {
514            query.push_str(" LIMIT ?");
515        }
516
517        // Build the query with bound parameters
518        let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
519
520        if let Some(start) = filter.start_time {
521            sql_query = sql_query.bind(start);
522        }
523        if let Some(end) = filter.end_time {
524            sql_query = sql_query.bind(end);
525        }
526        if let Some(ref protocol) = filter.protocol {
527            sql_query = sql_query.bind(protocol);
528        }
529        if let Some(ref endpoint) = filter.endpoint {
530            sql_query = sql_query.bind(endpoint);
531        }
532        if let Some(ref method) = filter.method {
533            sql_query = sql_query.bind(method);
534        }
535        if let Some(status) = filter.status_code {
536            sql_query = sql_query.bind(status);
537        }
538        if let Some(ref workspace) = filter.workspace_id {
539            sql_query = sql_query.bind(workspace);
540        }
541        if let Some(ref env) = filter.environment {
542            sql_query = sql_query.bind(env);
543        }
544        if let Some(limit) = filter.limit {
545            sql_query = sql_query.bind(limit);
546        }
547
548        let results = sql_query.fetch_all(&self.pool).await?;
549
550        Ok(results)
551    }
552
553    /// Get hour-level aggregates
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the database query fails.
558    pub async fn get_hour_aggregates(
559        &self,
560        filter: &AnalyticsFilter,
561    ) -> Result<Vec<HourMetricsAggregate>> {
562        let mut query = String::from("SELECT * FROM metrics_aggregates_hour WHERE 1=1");
563
564        if filter.start_time.is_some() {
565            query.push_str(" AND timestamp >= ?");
566        }
567        if filter.end_time.is_some() {
568            query.push_str(" AND timestamp <= ?");
569        }
570        if filter.protocol.is_some() {
571            query.push_str(" AND protocol = ?");
572        }
573        if filter.endpoint.is_some() {
574            query.push_str(" AND endpoint = ?");
575        }
576        if filter.method.is_some() {
577            query.push_str(" AND method = ?");
578        }
579        if filter.status_code.is_some() {
580            query.push_str(" AND status_code = ?");
581        }
582        if filter.workspace_id.is_some() {
583            query.push_str(" AND workspace_id = ?");
584        }
585        if filter.environment.is_some() {
586            query.push_str(" AND environment = ?");
587        }
588
589        query.push_str(" ORDER BY timestamp DESC");
590
591        if filter.limit.is_some() {
592            query.push_str(" LIMIT ?");
593        }
594
595        // Build the query with bound parameters
596        let mut sql_query = sqlx::query_as::<_, HourMetricsAggregate>(&query);
597
598        if let Some(start) = filter.start_time {
599            sql_query = sql_query.bind(start);
600        }
601        if let Some(end) = filter.end_time {
602            sql_query = sql_query.bind(end);
603        }
604        if let Some(ref protocol) = filter.protocol {
605            sql_query = sql_query.bind(protocol);
606        }
607        if let Some(ref endpoint) = filter.endpoint {
608            sql_query = sql_query.bind(endpoint);
609        }
610        if let Some(ref method) = filter.method {
611            sql_query = sql_query.bind(method);
612        }
613        if let Some(status) = filter.status_code {
614            sql_query = sql_query.bind(status);
615        }
616        if let Some(ref workspace) = filter.workspace_id {
617            sql_query = sql_query.bind(workspace);
618        }
619        if let Some(ref env) = filter.environment {
620            sql_query = sql_query.bind(env);
621        }
622        if let Some(limit) = filter.limit {
623            sql_query = sql_query.bind(limit);
624        }
625
626        let results = sql_query.fetch_all(&self.pool).await?;
627
628        Ok(results)
629    }
630
631    /// Get top endpoints by request count
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if the database query fails.
636    pub async fn get_top_endpoints(
637        &self,
638        limit: i64,
639        workspace_id: Option<&str>,
640    ) -> Result<Vec<EndpointStats>> {
641        let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
642
643        if workspace_id.is_some() {
644            query.push_str(" AND workspace_id = ?");
645        }
646
647        query.push_str(" ORDER BY total_requests DESC LIMIT ?");
648
649        let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
650
651        if let Some(workspace) = workspace_id {
652            sql_query = sql_query.bind(workspace);
653        }
654
655        sql_query = sql_query.bind(limit);
656
657        let results = sql_query.fetch_all(&self.pool).await?;
658
659        Ok(results)
660    }
661
662    /// Get recent error events
663    ///
664    /// # Errors
665    ///
666    /// Returns an error if the database query fails.
667    pub async fn get_recent_errors(
668        &self,
669        limit: i64,
670        filter: &AnalyticsFilter,
671    ) -> Result<Vec<ErrorEvent>> {
672        let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
673
674        if filter.start_time.is_some() {
675            query.push_str(" AND timestamp >= ?");
676        }
677        if filter.end_time.is_some() {
678            query.push_str(" AND timestamp <= ?");
679        }
680        if filter.endpoint.is_some() {
681            query.push_str(" AND endpoint = ?");
682        }
683        if filter.workspace_id.is_some() {
684            query.push_str(" AND workspace_id = ?");
685        }
686
687        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
688
689        let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
690
691        if let Some(start) = filter.start_time {
692            sql_query = sql_query.bind(start);
693        }
694        if let Some(end) = filter.end_time {
695            sql_query = sql_query.bind(end);
696        }
697        if let Some(ref endpoint) = filter.endpoint {
698            sql_query = sql_query.bind(endpoint);
699        }
700        if let Some(ref workspace) = filter.workspace_id {
701            sql_query = sql_query.bind(workspace);
702        }
703
704        sql_query = sql_query.bind(limit);
705
706        let results = sql_query.fetch_all(&self.pool).await?;
707
708        Ok(results)
709    }
710
711    /// Get traffic patterns for heatmap
712    ///
713    /// # Errors
714    ///
715    /// Returns an error if the database query fails.
716    pub async fn get_traffic_patterns(
717        &self,
718        days: i64,
719        workspace_id: Option<&str>,
720    ) -> Result<Vec<TrafficPattern>> {
721        let start_date = chrono::Utc::now() - chrono::Duration::days(days);
722        let start_date_str = start_date.format("%Y-%m-%d").to_string();
723
724        let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
725
726        if workspace_id.is_some() {
727            query.push_str(" AND workspace_id = ?");
728        }
729
730        query.push_str(" ORDER BY date ASC, hour ASC");
731
732        let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
733
734        if let Some(workspace) = workspace_id {
735            query_builder = query_builder.bind(workspace);
736        }
737
738        let results = query_builder.fetch_all(&self.pool).await?;
739
740        Ok(results)
741    }
742
743    // ========================================================================
744    // Cleanup Operations
745    // ========================================================================
746
747    /// Delete old minute aggregates
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if the database delete fails.
752    pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
753        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
754
755        let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
756            .bind(cutoff)
757            .execute(&self.pool)
758            .await?;
759
760        info!(
761            "Cleaned up {} minute aggregates older than {} days",
762            result.rows_affected(),
763            days
764        );
765        Ok(result.rows_affected())
766    }
767
768    /// Delete old hour aggregates
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the database delete fails.
773    pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
774        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
775
776        let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
777            .bind(cutoff)
778            .execute(&self.pool)
779            .await?;
780
781        info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
782        Ok(result.rows_affected())
783    }
784
785    /// Delete old error events
786    ///
787    /// # Errors
788    ///
789    /// Returns an error if the database delete fails.
790    pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
791        let cutoff = chrono::Utc::now().timestamp() - (i64::from(days) * 86400);
792
793        let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
794            .bind(cutoff)
795            .execute(&self.pool)
796            .await?;
797
798        info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
799        Ok(result.rows_affected())
800    }
801
802    /// Vacuum the database to reclaim space
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the VACUUM command fails.
807    pub async fn vacuum(&self) -> Result<()> {
808        info!("Running VACUUM on analytics database");
809        sqlx::query("VACUUM").execute(&self.pool).await?;
810        info!("VACUUM completed");
811        Ok(())
812    }
813}
814
815// ============================================================================
816// Coverage Metrics Operations (MockOps)
817// ============================================================================
818
819impl AnalyticsDatabase {
820    /// Record scenario usage
821    ///
822    /// # Errors
823    ///
824    /// Returns an error if the database operation fails.
825    pub async fn record_scenario_usage(
826        &self,
827        scenario_id: &str,
828        workspace_id: Option<&str>,
829        org_id: Option<&str>,
830    ) -> Result<()> {
831        let now = chrono::Utc::now().timestamp();
832
833        // SQLite doesn't support ON CONFLICT with multiple columns easily, so use INSERT OR REPLACE
834        // First try to update existing record
835        let rows_affected = sqlx::query(
836            "UPDATE scenario_usage_metrics
837             SET usage_count = usage_count + 1,
838                 last_used_at = ?,
839                 updated_at = ?
840             WHERE scenario_id = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
841               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
842        )
843        .bind(now)
844        .bind(now)
845        .bind(scenario_id)
846        .bind(workspace_id)
847        .bind(workspace_id)
848        .bind(org_id)
849        .bind(org_id)
850        .execute(&self.pool)
851        .await?;
852
853        // If no rows were updated, insert a new record
854        if rows_affected.rows_affected() == 0 {
855            sqlx::query(
856                "INSERT INTO scenario_usage_metrics (scenario_id, workspace_id, org_id, usage_count, last_used_at, created_at, updated_at)
857                 VALUES (?, ?, ?, 1, ?, ?, ?)"
858            )
859            .bind(scenario_id)
860            .bind(workspace_id)
861            .bind(org_id)
862            .bind(now)
863            .bind(now)
864            .bind(now)
865            .execute(&self.pool)
866            .await?;
867        }
868
869        Ok(())
870    }
871
872    /// Record persona CI hit
873    ///
874    /// # Errors
875    ///
876    /// Returns an error if the database insert fails.
877    pub async fn record_persona_ci_hit(
878        &self,
879        persona_id: &str,
880        workspace_id: Option<&str>,
881        org_id: Option<&str>,
882        ci_run_id: Option<&str>,
883    ) -> Result<()> {
884        let now = chrono::Utc::now().timestamp();
885
886        sqlx::query(
887            "INSERT INTO persona_ci_hits (persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at)
888             VALUES (?, ?, ?, ?, 1, ?)"
889        )
890        .bind(persona_id)
891        .bind(workspace_id)
892        .bind(org_id)
893        .bind(ci_run_id)
894        .bind(now)
895        .execute(&self.pool)
896        .await?;
897
898        Ok(())
899    }
900
901    /// Record endpoint test coverage
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if the database operation fails.
906    #[allow(clippy::too_many_arguments)]
907    pub async fn record_endpoint_coverage(
908        &self,
909        endpoint: &str,
910        method: Option<&str>,
911        protocol: &str,
912        workspace_id: Option<&str>,
913        org_id: Option<&str>,
914        coverage_percentage: Option<f64>,
915    ) -> Result<()> {
916        let now = chrono::Utc::now().timestamp();
917
918        // Try to update existing record
919        let rows_affected = sqlx::query(
920            "UPDATE endpoint_coverage
921             SET test_count = test_count + 1,
922                 last_tested_at = ?,
923                 coverage_percentage = COALESCE(?, coverage_percentage),
924                 updated_at = ?
925             WHERE endpoint = ? AND (method = ? OR (method IS NULL AND ? IS NULL))
926               AND protocol = ? AND (workspace_id = ? OR (workspace_id IS NULL AND ? IS NULL))
927               AND (org_id = ? OR (org_id IS NULL AND ? IS NULL))",
928        )
929        .bind(now)
930        .bind(coverage_percentage)
931        .bind(now)
932        .bind(endpoint)
933        .bind(method)
934        .bind(method)
935        .bind(protocol)
936        .bind(workspace_id)
937        .bind(workspace_id)
938        .bind(org_id)
939        .bind(org_id)
940        .execute(&self.pool)
941        .await?;
942
943        // If no rows were updated, insert a new record
944        if rows_affected.rows_affected() == 0 {
945            sqlx::query(
946                "INSERT INTO endpoint_coverage (endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at)
947                 VALUES (?, ?, ?, ?, ?, 1, ?, ?, ?, ?)"
948            )
949            .bind(endpoint)
950            .bind(method)
951            .bind(protocol)
952            .bind(workspace_id)
953            .bind(org_id)
954            .bind(now)
955            .bind(coverage_percentage)
956            .bind(now)
957            .bind(now)
958            .execute(&self.pool)
959            .await?;
960        }
961
962        Ok(())
963    }
964
965    /// Record reality level staleness
966    ///
967    /// # Errors
968    ///
969    /// Returns an error if the database insert fails.
970    #[allow(clippy::too_many_arguments)]
971    pub async fn record_reality_level_staleness(
972        &self,
973        workspace_id: &str,
974        org_id: Option<&str>,
975        endpoint: Option<&str>,
976        method: Option<&str>,
977        protocol: Option<&str>,
978        current_reality_level: Option<&str>,
979        staleness_days: Option<i32>,
980    ) -> Result<()> {
981        let now = chrono::Utc::now().timestamp();
982        let last_updated = Some(staleness_days.map_or(now, |days| now - (i64::from(days) * 86400)));
983
984        sqlx::query(
985            "INSERT INTO reality_level_staleness (workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at)
986             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
987             ON CONFLICT DO NOTHING"
988        )
989        .bind(workspace_id)
990        .bind(org_id)
991        .bind(endpoint)
992        .bind(method)
993        .bind(protocol)
994        .bind(current_reality_level)
995        .bind(last_updated)
996        .bind(staleness_days)
997        .bind(now)
998        .bind(now)
999        .execute(&self.pool)
1000        .await?;
1001
1002        Ok(())
1003    }
1004
1005    /// Record drift percentage metrics
1006    ///
1007    /// # Errors
1008    ///
1009    /// Returns an error if the database insert fails.
1010    pub async fn record_drift_percentage(
1011        &self,
1012        workspace_id: &str,
1013        org_id: Option<&str>,
1014        total_mocks: i64,
1015        drifting_mocks: i64,
1016    ) -> Result<()> {
1017        let now = chrono::Utc::now().timestamp();
1018        #[allow(clippy::cast_precision_loss)]
1019        let drift_percentage = if total_mocks > 0 {
1020            (drifting_mocks as f64 / total_mocks as f64) * 100.0
1021        } else {
1022            0.0
1023        };
1024
1025        sqlx::query(
1026            "INSERT INTO drift_percentage_metrics (workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at)
1027             VALUES (?, ?, ?, ?, ?, ?)"
1028        )
1029        .bind(workspace_id)
1030        .bind(org_id)
1031        .bind(total_mocks)
1032        .bind(drifting_mocks)
1033        .bind(drift_percentage)
1034        .bind(now)
1035        .execute(&self.pool)
1036        .await?;
1037
1038        Ok(())
1039    }
1040
1041    /// Get scenario usage metrics
1042    ///
1043    /// # Errors
1044    ///
1045    /// Returns an error if the database query fails.
1046    pub async fn get_scenario_usage(
1047        &self,
1048        workspace_id: Option<&str>,
1049        org_id: Option<&str>,
1050        limit: Option<i64>,
1051    ) -> Result<Vec<ScenarioUsageMetrics>> {
1052        let limit = limit.unwrap_or(100);
1053        let mut query = String::from(
1054            "SELECT id, scenario_id, workspace_id, org_id, usage_count, last_used_at, usage_pattern, created_at, updated_at
1055             FROM scenario_usage_metrics
1056             WHERE 1=1"
1057        );
1058
1059        if workspace_id.is_some() {
1060            query.push_str(" AND workspace_id = ?");
1061        }
1062        if org_id.is_some() {
1063            query.push_str(" AND org_id = ?");
1064        }
1065        query.push_str(" ORDER BY usage_count DESC LIMIT ?");
1066
1067        let mut q = sqlx::query_as::<_, ScenarioUsageMetrics>(&query);
1068        if let Some(ws_id) = workspace_id {
1069            q = q.bind(ws_id);
1070        }
1071        if let Some(o_id) = org_id {
1072            q = q.bind(o_id);
1073        }
1074        q = q.bind(limit);
1075
1076        let results = q.fetch_all(&self.pool).await?;
1077        Ok(results)
1078    }
1079
1080    /// Get persona CI hits
1081    ///
1082    /// # Errors
1083    ///
1084    /// Returns an error if the database query fails.
1085    pub async fn get_persona_ci_hits(
1086        &self,
1087        workspace_id: Option<&str>,
1088        org_id: Option<&str>,
1089        limit: Option<i64>,
1090    ) -> Result<Vec<PersonaCIHit>> {
1091        let limit = limit.unwrap_or(100);
1092        let mut query = String::from(
1093            "SELECT id, persona_id, workspace_id, org_id, ci_run_id, hit_count, hit_at, created_at
1094             FROM persona_ci_hits
1095             WHERE 1=1",
1096        );
1097
1098        if workspace_id.is_some() {
1099            query.push_str(" AND workspace_id = ?");
1100        }
1101        if org_id.is_some() {
1102            query.push_str(" AND org_id = ?");
1103        }
1104        query.push_str(" ORDER BY hit_at DESC LIMIT ?");
1105
1106        let mut q = sqlx::query_as::<_, PersonaCIHit>(&query);
1107        if let Some(ws_id) = workspace_id {
1108            q = q.bind(ws_id);
1109        }
1110        if let Some(o_id) = org_id {
1111            q = q.bind(o_id);
1112        }
1113        q = q.bind(limit);
1114
1115        let results = q.fetch_all(&self.pool).await?;
1116        Ok(results)
1117    }
1118
1119    /// Get endpoint coverage
1120    ///
1121    /// # Errors
1122    ///
1123    /// Returns an error if the database query fails.
1124    pub async fn get_endpoint_coverage(
1125        &self,
1126        workspace_id: Option<&str>,
1127        org_id: Option<&str>,
1128        min_coverage: Option<f64>,
1129    ) -> Result<Vec<EndpointCoverage>> {
1130        let mut query = String::from(
1131            "SELECT id, endpoint, method, protocol, workspace_id, org_id, test_count, last_tested_at, coverage_percentage, created_at, updated_at
1132             FROM endpoint_coverage
1133             WHERE 1=1"
1134        );
1135
1136        if workspace_id.is_some() {
1137            query.push_str(" AND workspace_id = ?");
1138        }
1139        if org_id.is_some() {
1140            query.push_str(" AND org_id = ?");
1141        }
1142        if min_coverage.is_some() {
1143            query.push_str(" AND (coverage_percentage IS NULL OR coverage_percentage < ?)");
1144        }
1145        query.push_str(" ORDER BY coverage_percentage ASC NULLS LAST, test_count DESC");
1146
1147        let mut q = sqlx::query_as::<_, EndpointCoverage>(&query);
1148        if let Some(ws_id) = workspace_id {
1149            q = q.bind(ws_id);
1150        }
1151        if let Some(o_id) = org_id {
1152            q = q.bind(o_id);
1153        }
1154        if let Some(min_cov) = min_coverage {
1155            q = q.bind(min_cov);
1156        }
1157
1158        let results = q.fetch_all(&self.pool).await?;
1159        Ok(results)
1160    }
1161
1162    /// Get reality level staleness
1163    ///
1164    /// # Errors
1165    ///
1166    /// Returns an error if the database query fails.
1167    pub async fn get_reality_level_staleness(
1168        &self,
1169        workspace_id: Option<&str>,
1170        org_id: Option<&str>,
1171        max_staleness_days: Option<i32>,
1172    ) -> Result<Vec<RealityLevelStaleness>> {
1173        let mut query = String::from(
1174            "SELECT id, workspace_id, org_id, endpoint, method, protocol, current_reality_level, last_updated_at, staleness_days, created_at, updated_at
1175             FROM reality_level_staleness
1176             WHERE 1=1"
1177        );
1178
1179        if workspace_id.is_some() {
1180            query.push_str(" AND workspace_id = ?");
1181        }
1182        if org_id.is_some() {
1183            query.push_str(" AND org_id = ?");
1184        }
1185        if max_staleness_days.is_some() {
1186            query.push_str(" AND (staleness_days IS NULL OR staleness_days > ?)");
1187        }
1188        query.push_str(" ORDER BY staleness_days DESC NULLS LAST");
1189
1190        let mut q = sqlx::query_as::<_, RealityLevelStaleness>(&query);
1191        if let Some(ws_id) = workspace_id {
1192            q = q.bind(ws_id);
1193        }
1194        if let Some(o_id) = org_id {
1195            q = q.bind(o_id);
1196        }
1197        if let Some(max_days) = max_staleness_days {
1198            q = q.bind(max_days);
1199        }
1200
1201        let results = q.fetch_all(&self.pool).await?;
1202        Ok(results)
1203    }
1204
1205    /// Get drift percentage metrics
1206    ///
1207    /// # Errors
1208    ///
1209    /// Returns an error if the database query fails.
1210    pub async fn get_drift_percentage(
1211        &self,
1212        workspace_id: Option<&str>,
1213        org_id: Option<&str>,
1214        limit: Option<i64>,
1215    ) -> Result<Vec<DriftPercentageMetrics>> {
1216        let limit = limit.unwrap_or(100);
1217        let mut query = String::from(
1218            "SELECT id, workspace_id, org_id, total_mocks, drifting_mocks, drift_percentage, measured_at, created_at
1219             FROM drift_percentage_metrics
1220             WHERE 1=1"
1221        );
1222
1223        if workspace_id.is_some() {
1224            query.push_str(" AND workspace_id = ?");
1225        }
1226        if org_id.is_some() {
1227            query.push_str(" AND org_id = ?");
1228        }
1229        query.push_str(" ORDER BY measured_at DESC LIMIT ?");
1230
1231        let mut q = sqlx::query_as::<_, DriftPercentageMetrics>(&query);
1232        if let Some(ws_id) = workspace_id {
1233            q = q.bind(ws_id);
1234        }
1235        if let Some(o_id) = org_id {
1236            q = q.bind(o_id);
1237        }
1238        q = q.bind(limit);
1239
1240        let results = q.fetch_all(&self.pool).await?;
1241        Ok(results)
1242    }
1243}
1244
1245#[cfg(test)]
1246mod tests {
1247    use super::*;
1248
1249    #[tokio::test]
1250    async fn test_database_creation() {
1251        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1252        db.run_migrations().await.unwrap();
1253    }
1254
1255    #[tokio::test]
1256    async fn test_insert_minute_aggregate() {
1257        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
1258        db.run_migrations().await.unwrap();
1259
1260        let agg = MetricsAggregate {
1261            id: None,
1262            timestamp: chrono::Utc::now().timestamp(),
1263            protocol: "HTTP".to_string(),
1264            method: Some("GET".to_string()),
1265            endpoint: Some("/api/test".to_string()),
1266            status_code: Some(200),
1267            workspace_id: None,
1268            environment: None,
1269            request_count: 100,
1270            error_count: 5,
1271            latency_sum: 500.0,
1272            latency_min: Some(10.0),
1273            latency_max: Some(100.0),
1274            latency_p50: Some(45.0),
1275            latency_p95: Some(95.0),
1276            latency_p99: Some(99.0),
1277            bytes_sent: 10_000,
1278            bytes_received: 5_000,
1279            active_connections: Some(10),
1280            created_at: None,
1281        };
1282
1283        let id = db.insert_minute_aggregate(&agg).await.unwrap();
1284        assert!(id > 0);
1285    }
1286}