llm_analytics_hub/database/
mod.rs

1//! Database Interaction Layer
2//!
3//! Provides high-performance database operations with connection pooling,
4//! prepared statements, and transaction management for TimescaleDB.
5
6use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::postgres::{PgPool, PgPoolOptions};
10use sqlx::{FromRow, Row};
11use std::time::Duration;
12use tracing::{info, instrument};
13use uuid::Uuid;
14
15pub mod queries;
16pub mod schema;
17
18use crate::schemas::events::AnalyticsEvent;
19use crate::models::metrics::{StatisticalMeasures, TimeWindow};
20
21/// Database configuration
22#[derive(Debug, Clone, Deserialize)]
23pub struct DatabaseConfig {
24    pub host: String,
25    pub port: u16,
26    pub database: String,
27    pub username: String,
28    pub password: String,
29    pub max_connections: u32,
30    pub min_connections: u32,
31    pub connection_timeout: u64,
32    pub idle_timeout: u64,
33    pub max_lifetime: u64,
34}
35
36impl Default for DatabaseConfig {
37    fn default() -> Self {
38        Self {
39            host: "localhost".to_string(),
40            port: 5432,
41            database: "llm_analytics".to_string(),
42            username: "postgres".to_string(),
43            password: "postgres".to_string(),
44            max_connections: 50,
45            min_connections: 5,
46            connection_timeout: 30,
47            idle_timeout: 600,
48            max_lifetime: 1800,
49        }
50    }
51}
52
53impl DatabaseConfig {
54    /// Build a PostgreSQL connection string
55    pub fn connection_string(&self) -> String {
56        format!(
57            "postgres://{}:{}@{}:{}/{}",
58            self.username, self.password, self.host, self.port, self.database
59        )
60    }
61}
62
63/// Database client with connection pooling
64pub struct Database {
65    pool: PgPool,
66}
67
68impl Database {
69    /// Create a new database client with connection pool
70    #[instrument(skip(config))]
71    pub async fn new(config: DatabaseConfig) -> Result<Self> {
72        info!("Initializing database connection pool");
73
74        let pool = PgPoolOptions::new()
75            .max_connections(config.max_connections)
76            .min_connections(config.min_connections)
77            .acquire_timeout(Duration::from_secs(config.connection_timeout))
78            .idle_timeout(Duration::from_secs(config.idle_timeout))
79            .max_lifetime(Duration::from_secs(config.max_lifetime))
80            .connect(&config.connection_string())
81            .await
82            .context("Failed to create database connection pool")?;
83
84        // Test the connection
85        sqlx::query("SELECT 1")
86            .execute(&pool)
87            .await
88            .context("Failed to test database connection")?;
89
90        info!("Database connection pool initialized successfully");
91
92        Ok(Self { pool })
93    }
94
95    /// Create a new database client from a connection URL
96    #[instrument(skip(url))]
97    pub async fn from_url(url: &str) -> Result<Self> {
98        info!("Initializing database connection pool from URL");
99
100        let pool = PgPoolOptions::new()
101            .max_connections(50)
102            .min_connections(5)
103            .acquire_timeout(Duration::from_secs(30))
104            .idle_timeout(Duration::from_secs(600))
105            .max_lifetime(Duration::from_secs(1800))
106            .connect(url)
107            .await
108            .context("Failed to create database connection pool")?;
109
110        // Test the connection
111        sqlx::query("SELECT 1")
112            .execute(&pool)
113            .await
114            .context("Failed to test database connection")?;
115
116        info!("Database connection pool initialized successfully");
117
118        Ok(Self { pool })
119    }
120
121    /// Get a reference to the connection pool
122    pub fn pool(&self) -> &PgPool {
123        &self.pool
124    }
125
126    /// Close the database connection pool
127    pub async fn close(&self) {
128        self.pool.close().await;
129    }
130
131    // ========== Event Operations ==========
132
133    /// Insert a single analytics event
134    #[instrument(skip(self, event))]
135    pub async fn insert_event(&self, event: &AnalyticsEvent) -> Result<Uuid> {
136        let event_json = serde_json::to_value(event)
137            .context("Failed to serialize event")?;
138
139        let row = sqlx::query(
140            r#"
141            INSERT INTO events (
142                event_id, timestamp, source_module, event_type,
143                correlation_id, parent_event_id, schema_version,
144                severity, environment, tags, payload
145            )
146            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
147            RETURNING event_id
148            "#
149        )
150        .bind(event.common.event_id)
151        .bind(event.common.timestamp)
152        .bind(serde_json::to_value(&event.common.source_module)?)
153        .bind(serde_json::to_value(&event.common.event_type)?)
154        .bind(event.common.correlation_id)
155        .bind(event.common.parent_event_id)
156        .bind(&event.common.schema_version)
157        .bind(serde_json::to_value(&event.common.severity)?)
158        .bind(&event.common.environment)
159        .bind(serde_json::to_value(&event.common.tags)?)
160        .bind(event_json)
161        .fetch_one(&self.pool)
162        .await
163        .context("Failed to insert event")?;
164
165        Ok(row.try_get("event_id")?)
166    }
167
168    /// Batch insert analytics events for high throughput
169    #[instrument(skip(self, events))]
170    pub async fn insert_events_batch(&self, events: &[AnalyticsEvent]) -> Result<u64> {
171        if events.is_empty() {
172            return Ok(0);
173        }
174
175        let mut tx = self.pool.begin().await?;
176        let mut inserted = 0u64;
177
178        // Use COPY for maximum performance with large batches
179        if events.len() > 100 {
180            // Build bulk insert query
181            let mut query_builder = sqlx::QueryBuilder::new(
182                "INSERT INTO events (event_id, timestamp, source_module, event_type, \
183                 correlation_id, parent_event_id, schema_version, severity, environment, \
184                 tags, payload) "
185            );
186
187            query_builder.push_values(events, |mut b, event| {
188                let event_json = serde_json::to_value(event).unwrap();
189                b.push_bind(event.common.event_id)
190                    .push_bind(event.common.timestamp)
191                    .push_bind(serde_json::to_value(&event.common.source_module).unwrap())
192                    .push_bind(serde_json::to_value(&event.common.event_type).unwrap())
193                    .push_bind(event.common.correlation_id)
194                    .push_bind(event.common.parent_event_id)
195                    .push_bind(&event.common.schema_version)
196                    .push_bind(serde_json::to_value(&event.common.severity).unwrap())
197                    .push_bind(&event.common.environment)
198                    .push_bind(serde_json::to_value(&event.common.tags).unwrap())
199                    .push_bind(event_json);
200            });
201
202            let result = query_builder.build().execute(&mut *tx).await?;
203            inserted = result.rows_affected();
204        } else {
205            // Use individual inserts for smaller batches
206            for event in events {
207                self.insert_event(event).await?;
208                inserted += 1;
209            }
210        }
211
212        tx.commit().await?;
213        Ok(inserted)
214    }
215
216    /// Query events by time range
217    #[instrument(skip(self))]
218    pub async fn query_events(
219        &self,
220        start: DateTime<Utc>,
221        end: DateTime<Utc>,
222        limit: Option<i64>,
223    ) -> Result<Vec<AnalyticsEvent>> {
224        let limit = limit.unwrap_or(1000);
225
226        let rows = sqlx::query(
227            r#"
228            SELECT payload
229            FROM events
230            WHERE timestamp >= $1 AND timestamp < $2
231            ORDER BY timestamp DESC
232            LIMIT $3
233            "#
234        )
235        .bind(start)
236        .bind(end)
237        .bind(limit)
238        .fetch_all(&self.pool)
239        .await
240        .context("Failed to query events")?;
241
242        let events: Vec<AnalyticsEvent> = rows
243            .into_iter()
244            .filter_map(|row| {
245                let payload: serde_json::Value = row.try_get("payload").ok()?;
246                serde_json::from_value(payload).ok()
247            })
248            .collect();
249
250        Ok(events)
251    }
252
253    /// Query events by correlation ID
254    #[instrument(skip(self))]
255    pub async fn query_events_by_correlation(
256        &self,
257        correlation_id: Uuid,
258    ) -> Result<Vec<AnalyticsEvent>> {
259        let rows = sqlx::query(
260            r#"
261            SELECT payload
262            FROM events
263            WHERE correlation_id = $1
264            ORDER BY timestamp ASC
265            "#
266        )
267        .bind(correlation_id)
268        .fetch_all(&self.pool)
269        .await
270        .context("Failed to query events by correlation")?;
271
272        let events: Vec<AnalyticsEvent> = rows
273            .into_iter()
274            .filter_map(|row| {
275                let payload: serde_json::Value = row.try_get("payload").ok()?;
276                serde_json::from_value(payload).ok()
277            })
278            .collect();
279
280        Ok(events)
281    }
282
283    // ========== Metrics Operations ==========
284
285    /// Store aggregated metrics
286    #[instrument(skip(self))]
287    pub async fn store_aggregated_metric(
288        &self,
289        metric_name: &str,
290        time_window: TimeWindow,
291        window_start: DateTime<Utc>,
292        tags: &serde_json::Value,
293        measures: &StatisticalMeasures,
294    ) -> Result<()> {
295        sqlx::query(
296            r#"
297            INSERT INTO aggregated_metrics (
298                metric_name, time_window, window_start, tags,
299                avg, min, max, p50, p95, p99, stddev, count, sum
300            )
301            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
302            ON CONFLICT (metric_name, time_window, window_start, tags)
303            DO UPDATE SET
304                avg = EXCLUDED.avg,
305                min = EXCLUDED.min,
306                max = EXCLUDED.max,
307                p50 = EXCLUDED.p50,
308                p95 = EXCLUDED.p95,
309                p99 = EXCLUDED.p99,
310                stddev = EXCLUDED.stddev,
311                count = EXCLUDED.count,
312                sum = EXCLUDED.sum
313            "#
314        )
315        .bind(metric_name)
316        .bind(time_window.as_str())
317        .bind(window_start)
318        .bind(tags)
319        .bind(measures.avg)
320        .bind(measures.min)
321        .bind(measures.max)
322        .bind(measures.p50)
323        .bind(measures.p95)
324        .bind(measures.p99)
325        .bind(measures.stddev)
326        .bind(measures.count as i64)
327        .bind(measures.sum)
328        .execute(&self.pool)
329        .await
330        .context("Failed to store aggregated metric")?;
331
332        Ok(())
333    }
334
335    /// Query aggregated metrics
336    #[instrument(skip(self))]
337    pub async fn query_aggregated_metrics(
338        &self,
339        metric_name: &str,
340        time_window: TimeWindow,
341        start: DateTime<Utc>,
342        end: DateTime<Utc>,
343    ) -> Result<Vec<AggregatedMetricRow>> {
344        let rows = sqlx::query_as::<_, AggregatedMetricRow>(
345            r#"
346            SELECT
347                metric_name, time_window, window_start, tags,
348                avg, min, max, p50, p95, p99, stddev, count, sum
349            FROM aggregated_metrics
350            WHERE metric_name = $1
351              AND time_window = $2
352              AND window_start >= $3
353              AND window_start < $4
354            ORDER BY window_start ASC
355            "#
356        )
357        .bind(metric_name)
358        .bind(time_window.as_str())
359        .bind(start)
360        .bind(end)
361        .fetch_all(&self.pool)
362        .await
363        .context("Failed to query aggregated metrics")?;
364
365        Ok(rows)
366    }
367
368    // ========== Anomaly Operations ==========
369
370    /// Store detected anomaly
371    #[instrument(skip(self))]
372    pub async fn store_anomaly(
373        &self,
374        anomaly_id: Uuid,
375        detected_at: DateTime<Utc>,
376        metric_name: &str,
377        anomaly_type: &str,
378        severity: &str,
379        value: f64,
380        expected_value: Option<f64>,
381        confidence_score: f64,
382        context: &serde_json::Value,
383    ) -> Result<Uuid> {
384        let result = sqlx::query(
385            r#"
386            INSERT INTO anomalies (
387                anomaly_id, detected_at, metric_name, anomaly_type,
388                severity, value, expected_value, confidence_score, context
389            )
390            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
391            RETURNING anomaly_id
392            "#
393        )
394        .bind(anomaly_id)
395        .bind(detected_at)
396        .bind(metric_name)
397        .bind(anomaly_type)
398        .bind(severity)
399        .bind(value)
400        .bind(expected_value)
401        .bind(confidence_score)
402        .bind(context)
403        .fetch_one(&self.pool)
404        .await
405        .context("Failed to store anomaly")?;
406
407        Ok(result.try_get("anomaly_id")?)
408    }
409
410    /// Query recent anomalies
411    #[instrument(skip(self))]
412    pub async fn query_recent_anomalies(
413        &self,
414        since: DateTime<Utc>,
415        limit: Option<i64>,
416    ) -> Result<Vec<AnomalyRow>> {
417        let limit = limit.unwrap_or(100);
418
419        let rows = sqlx::query_as::<_, AnomalyRow>(
420            r#"
421            SELECT
422                anomaly_id, detected_at, metric_name, anomaly_type,
423                severity, value, expected_value, confidence_score, context
424            FROM anomalies
425            WHERE detected_at >= $1
426            ORDER BY detected_at DESC
427            LIMIT $2
428            "#
429        )
430        .bind(since)
431        .bind(limit)
432        .fetch_all(&self.pool)
433        .await
434        .context("Failed to query anomalies")?;
435
436        Ok(rows)
437    }
438
439    // ========== Correlation Operations ==========
440
441    /// Store event correlation
442    #[instrument(skip(self))]
443    pub async fn store_correlation(
444        &self,
445        correlation_id: Uuid,
446        correlation_type: &str,
447        source_event_id: Uuid,
448        target_event_id: Uuid,
449        strength: f64,
450        metadata: &serde_json::Value,
451    ) -> Result<Uuid> {
452        let result = sqlx::query(
453            r#"
454            INSERT INTO correlations (
455                correlation_id, correlation_type, source_event_id,
456                target_event_id, strength, metadata, created_at
457            )
458            VALUES ($1, $2, $3, $4, $5, $6, NOW())
459            RETURNING correlation_id
460            "#
461        )
462        .bind(correlation_id)
463        .bind(correlation_type)
464        .bind(source_event_id)
465        .bind(target_event_id)
466        .bind(strength)
467        .bind(metadata)
468        .fetch_one(&self.pool)
469        .await
470        .context("Failed to store correlation")?;
471
472        Ok(result.try_get("correlation_id")?)
473    }
474
475    // ========== Health Check ==========
476
477    /// Check database health
478    pub async fn health_check(&self) -> Result<DatabaseHealth> {
479        let result = sqlx::query(
480            r#"
481            SELECT
482                (SELECT COUNT(*) FROM pg_stat_activity WHERE state = 'active') as active_connections,
483                (SELECT COUNT(*) FROM events WHERE timestamp > NOW() - INTERVAL '1 minute') as recent_events,
484                (SELECT pg_database_size(current_database())) as database_size
485            "#
486        )
487        .fetch_one(&self.pool)
488        .await?;
489
490        Ok(DatabaseHealth {
491            active_connections: result.try_get::<Option<i64>, _>("active_connections")?.unwrap_or(0),
492            recent_events: result.try_get::<Option<i64>, _>("recent_events")?.unwrap_or(0),
493            database_size_bytes: result.try_get::<Option<i64>, _>("database_size")?.unwrap_or(0),
494        })
495    }
496}
497
498// ========== Database Types ==========
499
500#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
501pub struct AggregatedMetricRow {
502    pub metric_name: String,
503    pub time_window: String,
504    pub window_start: DateTime<Utc>,
505    pub tags: serde_json::Value,
506    pub avg: f64,
507    pub min: f64,
508    pub max: f64,
509    pub p50: f64,
510    pub p95: f64,
511    pub p99: f64,
512    pub stddev: Option<f64>,
513    pub count: i64,
514    pub sum: f64,
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
518pub struct AnomalyRow {
519    pub anomaly_id: Uuid,
520    pub detected_at: DateTime<Utc>,
521    pub metric_name: String,
522    pub anomaly_type: String,
523    pub severity: String,
524    pub value: f64,
525    pub expected_value: Option<f64>,
526    pub confidence_score: f64,
527    pub context: serde_json::Value,
528}
529
530#[derive(Debug, Clone, Serialize, Deserialize)]
531pub struct DatabaseHealth {
532    pub active_connections: i64,
533    pub recent_events: i64,
534    pub database_size_bytes: i64,
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540
541    #[tokio::test]
542    async fn test_database_config_connection_string() {
543        let config = DatabaseConfig::default();
544        let conn_str = config.connection_string();
545        assert!(conn_str.contains("postgres://"));
546        assert!(conn_str.contains("localhost:5432"));
547    }
548}