mockforge_analytics/
database.rs

1//! Database layer for analytics storage
2
3use crate::error::{AnalyticsError, Result};
4use crate::models::*;
5use futures::TryStreamExt;
6use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
7use std::path::Path;
8use tracing::{debug, error, info};
9
10/// Analytics database manager
11#[derive(Clone)]
12pub struct AnalyticsDatabase {
13    pool: Pool<Sqlite>,
14}
15
16impl AnalyticsDatabase {
17    /// Create a new analytics database connection
18    ///
19    /// # Arguments
20    /// * `database_path` - Path to the SQLite database file (or ":memory:" for in-memory)
21    pub async fn new(database_path: &Path) -> Result<Self> {
22        let db_url = if database_path.to_str() == Some(":memory:") {
23            "sqlite::memory:".to_string()
24        } else {
25            format!("sqlite://{}", database_path.display())
26        };
27
28        info!("Connecting to analytics database: {}", db_url);
29
30        let pool =
31            SqlitePoolOptions::new()
32                .max_connections(10)
33                .connect(&db_url)
34                .await
35                .map_err(|e| {
36                    error!("Failed to connect to analytics database: {}", e);
37                    AnalyticsError::Database(e)
38                })?;
39
40        // Enable WAL mode for better concurrency
41        sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
42
43        // Enable foreign keys
44        sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
45
46        Ok(Self { pool })
47    }
48
49    /// Run database migrations
50    pub async fn run_migrations(&self) -> Result<()> {
51        info!("Running analytics database migrations");
52
53        let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
54
55        // Use execute_many which handles multiple statements
56        let mut conn = self.pool.acquire().await?;
57
58        let mut stream = conn.execute_many(migration_sql);
59
60        // Consume the stream to execute all statements
61        while let Some(_) = stream.try_next().await.map_err(|e| {
62            error!("Migration error: {}", e);
63            AnalyticsError::Migration(format!("Failed to execute migration: {}", e))
64        })? {}
65
66        info!("Analytics database migrations completed successfully");
67        Ok(())
68    }
69
70    /// Get a reference to the database pool
71    pub fn pool(&self) -> &SqlitePool {
72        &self.pool
73    }
74
75    // ========================================================================
76    // Insert Operations
77    // ========================================================================
78
79    /// Insert a minute-level metrics aggregate
80    pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
81        let result = sqlx::query(
82            "INSERT INTO metrics_aggregates_minute (
83                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
84                request_count, error_count, latency_sum, latency_min, latency_max,
85                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
86            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
87        )
88        .bind(agg.timestamp)
89        .bind(&agg.protocol)
90        .bind(&agg.method)
91        .bind(&agg.endpoint)
92        .bind(agg.status_code)
93        .bind(&agg.workspace_id)
94        .bind(&agg.environment)
95        .bind(agg.request_count)
96        .bind(agg.error_count)
97        .bind(agg.latency_sum)
98        .bind(agg.latency_min)
99        .bind(agg.latency_max)
100        .bind(agg.latency_p50)
101        .bind(agg.latency_p95)
102        .bind(agg.latency_p99)
103        .bind(agg.bytes_sent)
104        .bind(agg.bytes_received)
105        .bind(agg.active_connections)
106        .execute(&self.pool)
107        .await?;
108
109        Ok(result.last_insert_rowid())
110    }
111
112    /// Insert multiple minute-level aggregates in a batch
113    pub async fn insert_minute_aggregates_batch(
114        &self,
115        aggregates: &[MetricsAggregate],
116    ) -> Result<()> {
117        if aggregates.is_empty() {
118            return Ok(());
119        }
120
121        let mut tx = self.pool.begin().await?;
122
123        for agg in aggregates {
124            sqlx::query(
125                r"
126                INSERT INTO metrics_aggregates_minute (
127                    timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
128                    request_count, error_count, latency_sum, latency_min, latency_max,
129                    latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
130                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
131                ",
132            )
133            .bind(agg.timestamp)
134            .bind(&agg.protocol)
135            .bind(&agg.method)
136            .bind(&agg.endpoint)
137            .bind(agg.status_code)
138            .bind(&agg.workspace_id)
139            .bind(&agg.environment)
140            .bind(agg.request_count)
141            .bind(agg.error_count)
142            .bind(agg.latency_sum)
143            .bind(agg.latency_min)
144            .bind(agg.latency_max)
145            .bind(agg.latency_p50)
146            .bind(agg.latency_p95)
147            .bind(agg.latency_p99)
148            .bind(agg.bytes_sent)
149            .bind(agg.bytes_received)
150            .bind(agg.active_connections)
151            .execute(&mut *tx)
152            .await?;
153        }
154
155        tx.commit().await?;
156        debug!("Inserted {} minute aggregates", aggregates.len());
157        Ok(())
158    }
159
160    /// Insert an hour-level metrics aggregate
161    pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
162        let result = sqlx::query(
163            r"
164            INSERT INTO metrics_aggregates_hour (
165                timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
166                request_count, error_count, latency_sum, latency_min, latency_max,
167                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
168                active_connections_avg, active_connections_max
169            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
170            ",
171        )
172        .bind(agg.timestamp)
173        .bind(&agg.protocol)
174        .bind(&agg.method)
175        .bind(&agg.endpoint)
176        .bind(agg.status_code)
177        .bind(&agg.workspace_id)
178        .bind(&agg.environment)
179        .bind(agg.request_count)
180        .bind(agg.error_count)
181        .bind(agg.latency_sum)
182        .bind(agg.latency_min)
183        .bind(agg.latency_max)
184        .bind(agg.latency_p50)
185        .bind(agg.latency_p95)
186        .bind(agg.latency_p99)
187        .bind(agg.bytes_sent)
188        .bind(agg.bytes_received)
189        .bind(agg.active_connections_avg)
190        .bind(agg.active_connections_max)
191        .execute(&self.pool)
192        .await?;
193
194        Ok(result.last_insert_rowid())
195    }
196
197    /// Insert a day-level metrics aggregate
198    pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
199        let result = sqlx::query(
200            r"
201            INSERT INTO metrics_aggregates_day (
202                date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
203                request_count, error_count, latency_sum, latency_min, latency_max,
204                latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
205                active_connections_avg, active_connections_max, unique_clients, peak_hour
206            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
207            ",
208        )
209        .bind(&agg.date)
210        .bind(agg.timestamp)
211        .bind(&agg.protocol)
212        .bind(&agg.method)
213        .bind(&agg.endpoint)
214        .bind(agg.status_code)
215        .bind(&agg.workspace_id)
216        .bind(&agg.environment)
217        .bind(agg.request_count)
218        .bind(agg.error_count)
219        .bind(agg.latency_sum)
220        .bind(agg.latency_min)
221        .bind(agg.latency_max)
222        .bind(agg.latency_p50)
223        .bind(agg.latency_p95)
224        .bind(agg.latency_p99)
225        .bind(agg.bytes_sent)
226        .bind(agg.bytes_received)
227        .bind(agg.active_connections_avg)
228        .bind(agg.active_connections_max)
229        .bind(agg.unique_clients)
230        .bind(agg.peak_hour)
231        .execute(&self.pool)
232        .await?;
233
234        Ok(result.last_insert_rowid())
235    }
236
237    /// Insert or update endpoint statistics
238    pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
239        sqlx::query(
240            r"
241            INSERT INTO endpoint_stats (
242                endpoint, protocol, method, workspace_id, environment,
243                total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
244                p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
245                first_seen, last_seen
246            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
247            ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
248            DO UPDATE SET
249                total_requests = total_requests + excluded.total_requests,
250                total_errors = total_errors + excluded.total_errors,
251                avg_latency_ms = excluded.avg_latency_ms,
252                min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
253                max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
254                p95_latency_ms = excluded.p95_latency_ms,
255                status_codes = excluded.status_codes,
256                total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
257                total_bytes_received = total_bytes_received + excluded.total_bytes_received,
258                last_seen = excluded.last_seen,
259                updated_at = strftime('%s', 'now')
260            ",
261        )
262        .bind(&stats.endpoint)
263        .bind(&stats.protocol)
264        .bind(&stats.method)
265        .bind(&stats.workspace_id)
266        .bind(&stats.environment)
267        .bind(stats.total_requests)
268        .bind(stats.total_errors)
269        .bind(stats.avg_latency_ms)
270        .bind(stats.min_latency_ms)
271        .bind(stats.max_latency_ms)
272        .bind(stats.p95_latency_ms)
273        .bind(&stats.status_codes)
274        .bind(stats.total_bytes_sent)
275        .bind(stats.total_bytes_received)
276        .bind(stats.first_seen)
277        .bind(stats.last_seen)
278        .execute(&self.pool)
279        .await?;
280
281        Ok(())
282    }
283
284    /// Insert an error event
285    pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
286        let result = sqlx::query(
287            r"
288            INSERT INTO error_events (
289                timestamp, protocol, method, endpoint, status_code,
290                error_type, error_message, error_category,
291                request_id, trace_id, span_id,
292                client_ip, user_agent, workspace_id, environment, metadata
293            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
294            ",
295        )
296        .bind(error.timestamp)
297        .bind(&error.protocol)
298        .bind(&error.method)
299        .bind(&error.endpoint)
300        .bind(error.status_code)
301        .bind(&error.error_type)
302        .bind(&error.error_message)
303        .bind(&error.error_category)
304        .bind(&error.request_id)
305        .bind(&error.trace_id)
306        .bind(&error.span_id)
307        .bind(&error.client_ip)
308        .bind(&error.user_agent)
309        .bind(&error.workspace_id)
310        .bind(&error.environment)
311        .bind(&error.metadata)
312        .execute(&self.pool)
313        .await?;
314
315        Ok(result.last_insert_rowid())
316    }
317
318    /// Insert a traffic pattern
319    pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
320        sqlx::query(
321            r"
322            INSERT INTO traffic_patterns (
323                date, hour, day_of_week, protocol, workspace_id, environment,
324                request_count, error_count, avg_latency_ms, unique_clients
325            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
326            ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
327            DO UPDATE SET
328                request_count = request_count + excluded.request_count,
329                error_count = error_count + excluded.error_count,
330                avg_latency_ms = excluded.avg_latency_ms,
331                unique_clients = excluded.unique_clients
332            ",
333        )
334        .bind(&pattern.date)
335        .bind(pattern.hour)
336        .bind(pattern.day_of_week)
337        .bind(&pattern.protocol)
338        .bind(&pattern.workspace_id)
339        .bind(&pattern.environment)
340        .bind(pattern.request_count)
341        .bind(pattern.error_count)
342        .bind(pattern.avg_latency_ms)
343        .bind(pattern.unique_clients)
344        .execute(&self.pool)
345        .await?;
346
347        Ok(())
348    }
349
350    /// Insert an analytics snapshot
351    pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
352        let result = sqlx::query(
353            r"
354            INSERT INTO analytics_snapshots (
355                timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
356                active_connections, protocol_stats, top_endpoints,
357                memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
358                workspace_id, environment
359            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
360            ",
361        )
362        .bind(snapshot.timestamp)
363        .bind(&snapshot.snapshot_type)
364        .bind(snapshot.total_requests)
365        .bind(snapshot.total_errors)
366        .bind(snapshot.avg_latency_ms)
367        .bind(snapshot.active_connections)
368        .bind(&snapshot.protocol_stats)
369        .bind(&snapshot.top_endpoints)
370        .bind(snapshot.memory_usage_bytes)
371        .bind(snapshot.cpu_usage_percent)
372        .bind(snapshot.thread_count)
373        .bind(snapshot.uptime_seconds)
374        .bind(&snapshot.workspace_id)
375        .bind(&snapshot.environment)
376        .execute(&self.pool)
377        .await?;
378
379        Ok(result.last_insert_rowid())
380    }
381
382    // ========================================================================
383    // Query Operations
384    // ========================================================================
385
386    /// Get minute aggregates for a time range
387    pub async fn get_minute_aggregates(
388        &self,
389        filter: &AnalyticsFilter,
390    ) -> Result<Vec<MetricsAggregate>> {
391        let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
392
393        if filter.start_time.is_some() {
394            query.push_str(" AND timestamp >= ?");
395        }
396        if filter.end_time.is_some() {
397            query.push_str(" AND timestamp <= ?");
398        }
399        if filter.protocol.is_some() {
400            query.push_str(" AND protocol = ?");
401        }
402        if filter.endpoint.is_some() {
403            query.push_str(" AND endpoint = ?");
404        }
405        if filter.method.is_some() {
406            query.push_str(" AND method = ?");
407        }
408        if filter.status_code.is_some() {
409            query.push_str(" AND status_code = ?");
410        }
411        if filter.workspace_id.is_some() {
412            query.push_str(" AND workspace_id = ?");
413        }
414        if filter.environment.is_some() {
415            query.push_str(" AND environment = ?");
416        }
417
418        query.push_str(" ORDER BY timestamp DESC");
419
420        if filter.limit.is_some() {
421            query.push_str(" LIMIT ?");
422        }
423
424        // Build the query with bound parameters
425        let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
426
427        if let Some(start) = filter.start_time {
428            sql_query = sql_query.bind(start);
429        }
430        if let Some(end) = filter.end_time {
431            sql_query = sql_query.bind(end);
432        }
433        if let Some(ref protocol) = filter.protocol {
434            sql_query = sql_query.bind(protocol);
435        }
436        if let Some(ref endpoint) = filter.endpoint {
437            sql_query = sql_query.bind(endpoint);
438        }
439        if let Some(ref method) = filter.method {
440            sql_query = sql_query.bind(method);
441        }
442        if let Some(status) = filter.status_code {
443            sql_query = sql_query.bind(status);
444        }
445        if let Some(ref workspace) = filter.workspace_id {
446            sql_query = sql_query.bind(workspace);
447        }
448        if let Some(ref env) = filter.environment {
449            sql_query = sql_query.bind(env);
450        }
451        if let Some(limit) = filter.limit {
452            sql_query = sql_query.bind(limit);
453        }
454
455        let results = sql_query.fetch_all(&self.pool).await?;
456
457        Ok(results)
458    }
459
460    /// Get top endpoints by request count
461    pub async fn get_top_endpoints(
462        &self,
463        limit: i64,
464        workspace_id: Option<&str>,
465    ) -> Result<Vec<EndpointStats>> {
466        let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
467
468        if workspace_id.is_some() {
469            query.push_str(" AND workspace_id = ?");
470        }
471
472        query.push_str(" ORDER BY total_requests DESC LIMIT ?");
473
474        let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
475
476        if let Some(workspace) = workspace_id {
477            sql_query = sql_query.bind(workspace);
478        }
479
480        sql_query = sql_query.bind(limit);
481
482        let results = sql_query.fetch_all(&self.pool).await?;
483
484        Ok(results)
485    }
486
487    /// Get recent error events
488    pub async fn get_recent_errors(
489        &self,
490        limit: i64,
491        filter: &AnalyticsFilter,
492    ) -> Result<Vec<ErrorEvent>> {
493        let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
494
495        if filter.start_time.is_some() {
496            query.push_str(" AND timestamp >= ?");
497        }
498        if filter.end_time.is_some() {
499            query.push_str(" AND timestamp <= ?");
500        }
501        if filter.endpoint.is_some() {
502            query.push_str(" AND endpoint = ?");
503        }
504        if filter.workspace_id.is_some() {
505            query.push_str(" AND workspace_id = ?");
506        }
507
508        query.push_str(" ORDER BY timestamp DESC LIMIT ?");
509
510        let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
511
512        if let Some(start) = filter.start_time {
513            sql_query = sql_query.bind(start);
514        }
515        if let Some(end) = filter.end_time {
516            sql_query = sql_query.bind(end);
517        }
518        if let Some(ref endpoint) = filter.endpoint {
519            sql_query = sql_query.bind(endpoint);
520        }
521        if let Some(ref workspace) = filter.workspace_id {
522            sql_query = sql_query.bind(workspace);
523        }
524
525        sql_query = sql_query.bind(limit);
526
527        let results = sql_query.fetch_all(&self.pool).await?;
528
529        Ok(results)
530    }
531
532    /// Get traffic patterns for heatmap
533    pub async fn get_traffic_patterns(
534        &self,
535        days: i64,
536        workspace_id: Option<&str>,
537    ) -> Result<Vec<TrafficPattern>> {
538        let start_date = chrono::Utc::now() - chrono::Duration::days(days);
539        let start_date_str = start_date.format("%Y-%m-%d").to_string();
540
541        let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
542
543        if let Some(_workspace) = workspace_id {
544            query.push_str(" AND workspace_id = ?");
545        }
546
547        query.push_str(" ORDER BY date ASC, hour ASC");
548
549        let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
550
551        if let Some(workspace) = workspace_id {
552            query_builder = query_builder.bind(workspace);
553        }
554
555        let results = query_builder.fetch_all(&self.pool).await?;
556
557        Ok(results)
558    }
559
560    // ========================================================================
561    // Cleanup Operations
562    // ========================================================================
563
564    /// Delete old minute aggregates
565    pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
566        let cutoff = chrono::Utc::now().timestamp() - (days as i64 * 86400);
567
568        let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
569            .bind(cutoff)
570            .execute(&self.pool)
571            .await?;
572
573        info!(
574            "Cleaned up {} minute aggregates older than {} days",
575            result.rows_affected(),
576            days
577        );
578        Ok(result.rows_affected())
579    }
580
581    /// Delete old hour aggregates
582    pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
583        let cutoff = chrono::Utc::now().timestamp() - (days as i64 * 86400);
584
585        let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
586            .bind(cutoff)
587            .execute(&self.pool)
588            .await?;
589
590        info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
591        Ok(result.rows_affected())
592    }
593
594    /// Delete old error events
595    pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
596        let cutoff = chrono::Utc::now().timestamp() - (days as i64 * 86400);
597
598        let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
599            .bind(cutoff)
600            .execute(&self.pool)
601            .await?;
602
603        info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
604        Ok(result.rows_affected())
605    }
606
607    /// Vacuum the database to reclaim space
608    pub async fn vacuum(&self) -> Result<()> {
609        info!("Running VACUUM on analytics database");
610        sqlx::query("VACUUM").execute(&self.pool).await?;
611        info!("VACUUM completed");
612        Ok(())
613    }
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619
620    #[tokio::test]
621    async fn test_database_creation() {
622        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
623        db.run_migrations().await.unwrap();
624    }
625
626    #[tokio::test]
627    async fn test_insert_minute_aggregate() {
628        let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
629        db.run_migrations().await.unwrap();
630
631        let agg = MetricsAggregate {
632            id: None,
633            timestamp: chrono::Utc::now().timestamp(),
634            protocol: "HTTP".to_string(),
635            method: Some("GET".to_string()),
636            endpoint: Some("/api/test".to_string()),
637            status_code: Some(200),
638            workspace_id: None,
639            environment: None,
640            request_count: 100,
641            error_count: 5,
642            latency_sum: 500.0,
643            latency_min: Some(10.0),
644            latency_max: Some(100.0),
645            latency_p50: Some(45.0),
646            latency_p95: Some(95.0),
647            latency_p99: Some(99.0),
648            bytes_sent: 10000,
649            bytes_received: 5000,
650            active_connections: Some(10),
651            created_at: None,
652        };
653
654        let id = db.insert_minute_aggregate(&agg).await.unwrap();
655        assert!(id > 0);
656    }
657}