1use anyhow::{Context, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::postgres::{PgPool, PgPoolOptions};
10use sqlx::{FromRow, Row};
11use std::time::Duration;
12use tracing::{info, instrument};
13use uuid::Uuid;
14
15pub mod queries;
16pub mod schema;
17
18use crate::schemas::events::AnalyticsEvent;
19use crate::models::metrics::{StatisticalMeasures, TimeWindow};
20
21#[derive(Debug, Clone, Deserialize)]
23pub struct DatabaseConfig {
24 pub host: String,
25 pub port: u16,
26 pub database: String,
27 pub username: String,
28 pub password: String,
29 pub max_connections: u32,
30 pub min_connections: u32,
31 pub connection_timeout: u64,
32 pub idle_timeout: u64,
33 pub max_lifetime: u64,
34}
35
36impl Default for DatabaseConfig {
37 fn default() -> Self {
38 Self {
39 host: "localhost".to_string(),
40 port: 5432,
41 database: "llm_analytics".to_string(),
42 username: "postgres".to_string(),
43 password: "postgres".to_string(),
44 max_connections: 50,
45 min_connections: 5,
46 connection_timeout: 30,
47 idle_timeout: 600,
48 max_lifetime: 1800,
49 }
50 }
51}
52
53impl DatabaseConfig {
54 pub fn connection_string(&self) -> String {
56 format!(
57 "postgres://{}:{}@{}:{}/{}",
58 self.username, self.password, self.host, self.port, self.database
59 )
60 }
61}
62
63pub struct Database {
65 pool: PgPool,
66}
67
68impl Database {
69 #[instrument(skip(config))]
71 pub async fn new(config: DatabaseConfig) -> Result<Self> {
72 info!("Initializing database connection pool");
73
74 let pool = PgPoolOptions::new()
75 .max_connections(config.max_connections)
76 .min_connections(config.min_connections)
77 .acquire_timeout(Duration::from_secs(config.connection_timeout))
78 .idle_timeout(Duration::from_secs(config.idle_timeout))
79 .max_lifetime(Duration::from_secs(config.max_lifetime))
80 .connect(&config.connection_string())
81 .await
82 .context("Failed to create database connection pool")?;
83
84 sqlx::query("SELECT 1")
86 .execute(&pool)
87 .await
88 .context("Failed to test database connection")?;
89
90 info!("Database connection pool initialized successfully");
91
92 Ok(Self { pool })
93 }
94
95 #[instrument(skip(url))]
97 pub async fn from_url(url: &str) -> Result<Self> {
98 info!("Initializing database connection pool from URL");
99
100 let pool = PgPoolOptions::new()
101 .max_connections(50)
102 .min_connections(5)
103 .acquire_timeout(Duration::from_secs(30))
104 .idle_timeout(Duration::from_secs(600))
105 .max_lifetime(Duration::from_secs(1800))
106 .connect(url)
107 .await
108 .context("Failed to create database connection pool")?;
109
110 sqlx::query("SELECT 1")
112 .execute(&pool)
113 .await
114 .context("Failed to test database connection")?;
115
116 info!("Database connection pool initialized successfully");
117
118 Ok(Self { pool })
119 }
120
121 pub fn pool(&self) -> &PgPool {
123 &self.pool
124 }
125
126 pub async fn close(&self) {
128 self.pool.close().await;
129 }
130
131 #[instrument(skip(self, event))]
135 pub async fn insert_event(&self, event: &AnalyticsEvent) -> Result<Uuid> {
136 let event_json = serde_json::to_value(event)
137 .context("Failed to serialize event")?;
138
139 let row = sqlx::query(
140 r#"
141 INSERT INTO events (
142 event_id, timestamp, source_module, event_type,
143 correlation_id, parent_event_id, schema_version,
144 severity, environment, tags, payload
145 )
146 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
147 RETURNING event_id
148 "#
149 )
150 .bind(event.common.event_id)
151 .bind(event.common.timestamp)
152 .bind(serde_json::to_value(&event.common.source_module)?)
153 .bind(serde_json::to_value(&event.common.event_type)?)
154 .bind(event.common.correlation_id)
155 .bind(event.common.parent_event_id)
156 .bind(&event.common.schema_version)
157 .bind(serde_json::to_value(&event.common.severity)?)
158 .bind(&event.common.environment)
159 .bind(serde_json::to_value(&event.common.tags)?)
160 .bind(event_json)
161 .fetch_one(&self.pool)
162 .await
163 .context("Failed to insert event")?;
164
165 Ok(row.try_get("event_id")?)
166 }
167
168 #[instrument(skip(self, events))]
170 pub async fn insert_events_batch(&self, events: &[AnalyticsEvent]) -> Result<u64> {
171 if events.is_empty() {
172 return Ok(0);
173 }
174
175 let mut tx = self.pool.begin().await?;
176 let mut inserted = 0u64;
177
178 if events.len() > 100 {
180 let mut query_builder = sqlx::QueryBuilder::new(
182 "INSERT INTO events (event_id, timestamp, source_module, event_type, \
183 correlation_id, parent_event_id, schema_version, severity, environment, \
184 tags, payload) "
185 );
186
187 query_builder.push_values(events, |mut b, event| {
188 let event_json = serde_json::to_value(event).unwrap();
189 b.push_bind(event.common.event_id)
190 .push_bind(event.common.timestamp)
191 .push_bind(serde_json::to_value(&event.common.source_module).unwrap())
192 .push_bind(serde_json::to_value(&event.common.event_type).unwrap())
193 .push_bind(event.common.correlation_id)
194 .push_bind(event.common.parent_event_id)
195 .push_bind(&event.common.schema_version)
196 .push_bind(serde_json::to_value(&event.common.severity).unwrap())
197 .push_bind(&event.common.environment)
198 .push_bind(serde_json::to_value(&event.common.tags).unwrap())
199 .push_bind(event_json);
200 });
201
202 let result = query_builder.build().execute(&mut *tx).await?;
203 inserted = result.rows_affected();
204 } else {
205 for event in events {
207 self.insert_event(event).await?;
208 inserted += 1;
209 }
210 }
211
212 tx.commit().await?;
213 Ok(inserted)
214 }
215
216 #[instrument(skip(self))]
218 pub async fn query_events(
219 &self,
220 start: DateTime<Utc>,
221 end: DateTime<Utc>,
222 limit: Option<i64>,
223 ) -> Result<Vec<AnalyticsEvent>> {
224 let limit = limit.unwrap_or(1000);
225
226 let rows = sqlx::query(
227 r#"
228 SELECT payload
229 FROM events
230 WHERE timestamp >= $1 AND timestamp < $2
231 ORDER BY timestamp DESC
232 LIMIT $3
233 "#
234 )
235 .bind(start)
236 .bind(end)
237 .bind(limit)
238 .fetch_all(&self.pool)
239 .await
240 .context("Failed to query events")?;
241
242 let events: Vec<AnalyticsEvent> = rows
243 .into_iter()
244 .filter_map(|row| {
245 let payload: serde_json::Value = row.try_get("payload").ok()?;
246 serde_json::from_value(payload).ok()
247 })
248 .collect();
249
250 Ok(events)
251 }
252
253 #[instrument(skip(self))]
255 pub async fn query_events_by_correlation(
256 &self,
257 correlation_id: Uuid,
258 ) -> Result<Vec<AnalyticsEvent>> {
259 let rows = sqlx::query(
260 r#"
261 SELECT payload
262 FROM events
263 WHERE correlation_id = $1
264 ORDER BY timestamp ASC
265 "#
266 )
267 .bind(correlation_id)
268 .fetch_all(&self.pool)
269 .await
270 .context("Failed to query events by correlation")?;
271
272 let events: Vec<AnalyticsEvent> = rows
273 .into_iter()
274 .filter_map(|row| {
275 let payload: serde_json::Value = row.try_get("payload").ok()?;
276 serde_json::from_value(payload).ok()
277 })
278 .collect();
279
280 Ok(events)
281 }
282
283 #[instrument(skip(self))]
287 pub async fn store_aggregated_metric(
288 &self,
289 metric_name: &str,
290 time_window: TimeWindow,
291 window_start: DateTime<Utc>,
292 tags: &serde_json::Value,
293 measures: &StatisticalMeasures,
294 ) -> Result<()> {
295 sqlx::query(
296 r#"
297 INSERT INTO aggregated_metrics (
298 metric_name, time_window, window_start, tags,
299 avg, min, max, p50, p95, p99, stddev, count, sum
300 )
301 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
302 ON CONFLICT (metric_name, time_window, window_start, tags)
303 DO UPDATE SET
304 avg = EXCLUDED.avg,
305 min = EXCLUDED.min,
306 max = EXCLUDED.max,
307 p50 = EXCLUDED.p50,
308 p95 = EXCLUDED.p95,
309 p99 = EXCLUDED.p99,
310 stddev = EXCLUDED.stddev,
311 count = EXCLUDED.count,
312 sum = EXCLUDED.sum
313 "#
314 )
315 .bind(metric_name)
316 .bind(time_window.as_str())
317 .bind(window_start)
318 .bind(tags)
319 .bind(measures.avg)
320 .bind(measures.min)
321 .bind(measures.max)
322 .bind(measures.p50)
323 .bind(measures.p95)
324 .bind(measures.p99)
325 .bind(measures.stddev)
326 .bind(measures.count as i64)
327 .bind(measures.sum)
328 .execute(&self.pool)
329 .await
330 .context("Failed to store aggregated metric")?;
331
332 Ok(())
333 }
334
335 #[instrument(skip(self))]
337 pub async fn query_aggregated_metrics(
338 &self,
339 metric_name: &str,
340 time_window: TimeWindow,
341 start: DateTime<Utc>,
342 end: DateTime<Utc>,
343 ) -> Result<Vec<AggregatedMetricRow>> {
344 let rows = sqlx::query_as::<_, AggregatedMetricRow>(
345 r#"
346 SELECT
347 metric_name, time_window, window_start, tags,
348 avg, min, max, p50, p95, p99, stddev, count, sum
349 FROM aggregated_metrics
350 WHERE metric_name = $1
351 AND time_window = $2
352 AND window_start >= $3
353 AND window_start < $4
354 ORDER BY window_start ASC
355 "#
356 )
357 .bind(metric_name)
358 .bind(time_window.as_str())
359 .bind(start)
360 .bind(end)
361 .fetch_all(&self.pool)
362 .await
363 .context("Failed to query aggregated metrics")?;
364
365 Ok(rows)
366 }
367
368 #[instrument(skip(self))]
372 pub async fn store_anomaly(
373 &self,
374 anomaly_id: Uuid,
375 detected_at: DateTime<Utc>,
376 metric_name: &str,
377 anomaly_type: &str,
378 severity: &str,
379 value: f64,
380 expected_value: Option<f64>,
381 confidence_score: f64,
382 context: &serde_json::Value,
383 ) -> Result<Uuid> {
384 let result = sqlx::query(
385 r#"
386 INSERT INTO anomalies (
387 anomaly_id, detected_at, metric_name, anomaly_type,
388 severity, value, expected_value, confidence_score, context
389 )
390 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
391 RETURNING anomaly_id
392 "#
393 )
394 .bind(anomaly_id)
395 .bind(detected_at)
396 .bind(metric_name)
397 .bind(anomaly_type)
398 .bind(severity)
399 .bind(value)
400 .bind(expected_value)
401 .bind(confidence_score)
402 .bind(context)
403 .fetch_one(&self.pool)
404 .await
405 .context("Failed to store anomaly")?;
406
407 Ok(result.try_get("anomaly_id")?)
408 }
409
410 #[instrument(skip(self))]
412 pub async fn query_recent_anomalies(
413 &self,
414 since: DateTime<Utc>,
415 limit: Option<i64>,
416 ) -> Result<Vec<AnomalyRow>> {
417 let limit = limit.unwrap_or(100);
418
419 let rows = sqlx::query_as::<_, AnomalyRow>(
420 r#"
421 SELECT
422 anomaly_id, detected_at, metric_name, anomaly_type,
423 severity, value, expected_value, confidence_score, context
424 FROM anomalies
425 WHERE detected_at >= $1
426 ORDER BY detected_at DESC
427 LIMIT $2
428 "#
429 )
430 .bind(since)
431 .bind(limit)
432 .fetch_all(&self.pool)
433 .await
434 .context("Failed to query anomalies")?;
435
436 Ok(rows)
437 }
438
439 #[instrument(skip(self))]
443 pub async fn store_correlation(
444 &self,
445 correlation_id: Uuid,
446 correlation_type: &str,
447 source_event_id: Uuid,
448 target_event_id: Uuid,
449 strength: f64,
450 metadata: &serde_json::Value,
451 ) -> Result<Uuid> {
452 let result = sqlx::query(
453 r#"
454 INSERT INTO correlations (
455 correlation_id, correlation_type, source_event_id,
456 target_event_id, strength, metadata, created_at
457 )
458 VALUES ($1, $2, $3, $4, $5, $6, NOW())
459 RETURNING correlation_id
460 "#
461 )
462 .bind(correlation_id)
463 .bind(correlation_type)
464 .bind(source_event_id)
465 .bind(target_event_id)
466 .bind(strength)
467 .bind(metadata)
468 .fetch_one(&self.pool)
469 .await
470 .context("Failed to store correlation")?;
471
472 Ok(result.try_get("correlation_id")?)
473 }
474
475 pub async fn health_check(&self) -> Result<DatabaseHealth> {
479 let result = sqlx::query(
480 r#"
481 SELECT
482 (SELECT COUNT(*) FROM pg_stat_activity WHERE state = 'active') as active_connections,
483 (SELECT COUNT(*) FROM events WHERE timestamp > NOW() - INTERVAL '1 minute') as recent_events,
484 (SELECT pg_database_size(current_database())) as database_size
485 "#
486 )
487 .fetch_one(&self.pool)
488 .await?;
489
490 Ok(DatabaseHealth {
491 active_connections: result.try_get::<Option<i64>, _>("active_connections")?.unwrap_or(0),
492 recent_events: result.try_get::<Option<i64>, _>("recent_events")?.unwrap_or(0),
493 database_size_bytes: result.try_get::<Option<i64>, _>("database_size")?.unwrap_or(0),
494 })
495 }
496}
497
498#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
501pub struct AggregatedMetricRow {
502 pub metric_name: String,
503 pub time_window: String,
504 pub window_start: DateTime<Utc>,
505 pub tags: serde_json::Value,
506 pub avg: f64,
507 pub min: f64,
508 pub max: f64,
509 pub p50: f64,
510 pub p95: f64,
511 pub p99: f64,
512 pub stddev: Option<f64>,
513 pub count: i64,
514 pub sum: f64,
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
518pub struct AnomalyRow {
519 pub anomaly_id: Uuid,
520 pub detected_at: DateTime<Utc>,
521 pub metric_name: String,
522 pub anomaly_type: String,
523 pub severity: String,
524 pub value: f64,
525 pub expected_value: Option<f64>,
526 pub confidence_score: f64,
527 pub context: serde_json::Value,
528}
529
530#[derive(Debug, Clone, Serialize, Deserialize)]
531pub struct DatabaseHealth {
532 pub active_connections: i64,
533 pub recent_events: i64,
534 pub database_size_bytes: i64,
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540
541 #[tokio::test]
542 async fn test_database_config_connection_string() {
543 let config = DatabaseConfig::default();
544 let conn_str = config.connection_string();
545 assert!(conn_str.contains("postgres://"));
546 assert!(conn_str.contains("localhost:5432"));
547 }
548}