1use crate::error::{AnalyticsError, Result};
4use crate::models::*;
5use futures::TryStreamExt;
6use sqlx::{sqlite::SqlitePoolOptions, Executor, Pool, Sqlite, SqlitePool};
7use std::path::Path;
8use tracing::{debug, error, info};
9
10#[derive(Clone)]
12pub struct AnalyticsDatabase {
13 pool: Pool<Sqlite>,
14}
15
16impl AnalyticsDatabase {
17 pub async fn new(database_path: &Path) -> Result<Self> {
22 let db_url = if database_path.to_str() == Some(":memory:") {
23 "sqlite::memory:".to_string()
24 } else {
25 format!("sqlite://{}", database_path.display())
26 };
27
28 info!("Connecting to analytics database: {}", db_url);
29
30 let pool =
31 SqlitePoolOptions::new()
32 .max_connections(10)
33 .connect(&db_url)
34 .await
35 .map_err(|e| {
36 error!("Failed to connect to analytics database: {}", e);
37 AnalyticsError::Database(e)
38 })?;
39
40 sqlx::query("PRAGMA journal_mode = WAL").execute(&pool).await?;
42
43 sqlx::query("PRAGMA foreign_keys = ON").execute(&pool).await?;
45
46 Ok(Self { pool })
47 }
48
49 pub async fn run_migrations(&self) -> Result<()> {
51 info!("Running analytics database migrations");
52
53 let migration_sql = include_str!("../migrations/001_analytics_schema.sql");
54
55 let mut conn = self.pool.acquire().await?;
57
58 let mut stream = conn.execute_many(migration_sql);
59
60 while let Some(_) = stream.try_next().await.map_err(|e| {
62 error!("Migration error: {}", e);
63 AnalyticsError::Migration(format!("Failed to execute migration: {}", e))
64 })? {}
65
66 info!("Analytics database migrations completed successfully");
67 Ok(())
68 }
69
70 pub fn pool(&self) -> &SqlitePool {
72 &self.pool
73 }
74
75 pub async fn insert_minute_aggregate(&self, agg: &MetricsAggregate) -> Result<i64> {
81 let result = sqlx::query(
82 "INSERT INTO metrics_aggregates_minute (
83 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
84 request_count, error_count, latency_sum, latency_min, latency_max,
85 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
86 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
87 )
88 .bind(agg.timestamp)
89 .bind(&agg.protocol)
90 .bind(&agg.method)
91 .bind(&agg.endpoint)
92 .bind(agg.status_code)
93 .bind(&agg.workspace_id)
94 .bind(&agg.environment)
95 .bind(agg.request_count)
96 .bind(agg.error_count)
97 .bind(agg.latency_sum)
98 .bind(agg.latency_min)
99 .bind(agg.latency_max)
100 .bind(agg.latency_p50)
101 .bind(agg.latency_p95)
102 .bind(agg.latency_p99)
103 .bind(agg.bytes_sent)
104 .bind(agg.bytes_received)
105 .bind(agg.active_connections)
106 .execute(&self.pool)
107 .await?;
108
109 Ok(result.last_insert_rowid())
110 }
111
112 pub async fn insert_minute_aggregates_batch(
114 &self,
115 aggregates: &[MetricsAggregate],
116 ) -> Result<()> {
117 if aggregates.is_empty() {
118 return Ok(());
119 }
120
121 let mut tx = self.pool.begin().await?;
122
123 for agg in aggregates {
124 sqlx::query(
125 r"
126 INSERT INTO metrics_aggregates_minute (
127 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
128 request_count, error_count, latency_sum, latency_min, latency_max,
129 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received, active_connections
130 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
131 ",
132 )
133 .bind(agg.timestamp)
134 .bind(&agg.protocol)
135 .bind(&agg.method)
136 .bind(&agg.endpoint)
137 .bind(agg.status_code)
138 .bind(&agg.workspace_id)
139 .bind(&agg.environment)
140 .bind(agg.request_count)
141 .bind(agg.error_count)
142 .bind(agg.latency_sum)
143 .bind(agg.latency_min)
144 .bind(agg.latency_max)
145 .bind(agg.latency_p50)
146 .bind(agg.latency_p95)
147 .bind(agg.latency_p99)
148 .bind(agg.bytes_sent)
149 .bind(agg.bytes_received)
150 .bind(agg.active_connections)
151 .execute(&mut *tx)
152 .await?;
153 }
154
155 tx.commit().await?;
156 debug!("Inserted {} minute aggregates", aggregates.len());
157 Ok(())
158 }
159
160 pub async fn insert_hour_aggregate(&self, agg: &HourMetricsAggregate) -> Result<i64> {
162 let result = sqlx::query(
163 r"
164 INSERT INTO metrics_aggregates_hour (
165 timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
166 request_count, error_count, latency_sum, latency_min, latency_max,
167 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
168 active_connections_avg, active_connections_max
169 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
170 ",
171 )
172 .bind(agg.timestamp)
173 .bind(&agg.protocol)
174 .bind(&agg.method)
175 .bind(&agg.endpoint)
176 .bind(agg.status_code)
177 .bind(&agg.workspace_id)
178 .bind(&agg.environment)
179 .bind(agg.request_count)
180 .bind(agg.error_count)
181 .bind(agg.latency_sum)
182 .bind(agg.latency_min)
183 .bind(agg.latency_max)
184 .bind(agg.latency_p50)
185 .bind(agg.latency_p95)
186 .bind(agg.latency_p99)
187 .bind(agg.bytes_sent)
188 .bind(agg.bytes_received)
189 .bind(agg.active_connections_avg)
190 .bind(agg.active_connections_max)
191 .execute(&self.pool)
192 .await?;
193
194 Ok(result.last_insert_rowid())
195 }
196
197 pub async fn insert_day_aggregate(&self, agg: &DayMetricsAggregate) -> Result<i64> {
199 let result = sqlx::query(
200 r"
201 INSERT INTO metrics_aggregates_day (
202 date, timestamp, protocol, method, endpoint, status_code, workspace_id, environment,
203 request_count, error_count, latency_sum, latency_min, latency_max,
204 latency_p50, latency_p95, latency_p99, bytes_sent, bytes_received,
205 active_connections_avg, active_connections_max, unique_clients, peak_hour
206 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
207 ",
208 )
209 .bind(&agg.date)
210 .bind(agg.timestamp)
211 .bind(&agg.protocol)
212 .bind(&agg.method)
213 .bind(&agg.endpoint)
214 .bind(agg.status_code)
215 .bind(&agg.workspace_id)
216 .bind(&agg.environment)
217 .bind(agg.request_count)
218 .bind(agg.error_count)
219 .bind(agg.latency_sum)
220 .bind(agg.latency_min)
221 .bind(agg.latency_max)
222 .bind(agg.latency_p50)
223 .bind(agg.latency_p95)
224 .bind(agg.latency_p99)
225 .bind(agg.bytes_sent)
226 .bind(agg.bytes_received)
227 .bind(agg.active_connections_avg)
228 .bind(agg.active_connections_max)
229 .bind(agg.unique_clients)
230 .bind(agg.peak_hour)
231 .execute(&self.pool)
232 .await?;
233
234 Ok(result.last_insert_rowid())
235 }
236
237 pub async fn upsert_endpoint_stats(&self, stats: &EndpointStats) -> Result<()> {
239 sqlx::query(
240 r"
241 INSERT INTO endpoint_stats (
242 endpoint, protocol, method, workspace_id, environment,
243 total_requests, total_errors, avg_latency_ms, min_latency_ms, max_latency_ms,
244 p95_latency_ms, status_codes, total_bytes_sent, total_bytes_received,
245 first_seen, last_seen
246 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
247 ON CONFLICT (endpoint, protocol, COALESCE(method, ''), COALESCE(workspace_id, ''), COALESCE(environment, ''))
248 DO UPDATE SET
249 total_requests = total_requests + excluded.total_requests,
250 total_errors = total_errors + excluded.total_errors,
251 avg_latency_ms = excluded.avg_latency_ms,
252 min_latency_ms = MIN(min_latency_ms, excluded.min_latency_ms),
253 max_latency_ms = MAX(max_latency_ms, excluded.max_latency_ms),
254 p95_latency_ms = excluded.p95_latency_ms,
255 status_codes = excluded.status_codes,
256 total_bytes_sent = total_bytes_sent + excluded.total_bytes_sent,
257 total_bytes_received = total_bytes_received + excluded.total_bytes_received,
258 last_seen = excluded.last_seen,
259 updated_at = strftime('%s', 'now')
260 ",
261 )
262 .bind(&stats.endpoint)
263 .bind(&stats.protocol)
264 .bind(&stats.method)
265 .bind(&stats.workspace_id)
266 .bind(&stats.environment)
267 .bind(stats.total_requests)
268 .bind(stats.total_errors)
269 .bind(stats.avg_latency_ms)
270 .bind(stats.min_latency_ms)
271 .bind(stats.max_latency_ms)
272 .bind(stats.p95_latency_ms)
273 .bind(&stats.status_codes)
274 .bind(stats.total_bytes_sent)
275 .bind(stats.total_bytes_received)
276 .bind(stats.first_seen)
277 .bind(stats.last_seen)
278 .execute(&self.pool)
279 .await?;
280
281 Ok(())
282 }
283
284 pub async fn insert_error_event(&self, error: &ErrorEvent) -> Result<i64> {
286 let result = sqlx::query(
287 r"
288 INSERT INTO error_events (
289 timestamp, protocol, method, endpoint, status_code,
290 error_type, error_message, error_category,
291 request_id, trace_id, span_id,
292 client_ip, user_agent, workspace_id, environment, metadata
293 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
294 ",
295 )
296 .bind(error.timestamp)
297 .bind(&error.protocol)
298 .bind(&error.method)
299 .bind(&error.endpoint)
300 .bind(error.status_code)
301 .bind(&error.error_type)
302 .bind(&error.error_message)
303 .bind(&error.error_category)
304 .bind(&error.request_id)
305 .bind(&error.trace_id)
306 .bind(&error.span_id)
307 .bind(&error.client_ip)
308 .bind(&error.user_agent)
309 .bind(&error.workspace_id)
310 .bind(&error.environment)
311 .bind(&error.metadata)
312 .execute(&self.pool)
313 .await?;
314
315 Ok(result.last_insert_rowid())
316 }
317
318 pub async fn insert_traffic_pattern(&self, pattern: &TrafficPattern) -> Result<()> {
320 sqlx::query(
321 r"
322 INSERT INTO traffic_patterns (
323 date, hour, day_of_week, protocol, workspace_id, environment,
324 request_count, error_count, avg_latency_ms, unique_clients
325 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
326 ON CONFLICT (date, hour, protocol, COALESCE(workspace_id, ''), COALESCE(environment, ''))
327 DO UPDATE SET
328 request_count = request_count + excluded.request_count,
329 error_count = error_count + excluded.error_count,
330 avg_latency_ms = excluded.avg_latency_ms,
331 unique_clients = excluded.unique_clients
332 ",
333 )
334 .bind(&pattern.date)
335 .bind(pattern.hour)
336 .bind(pattern.day_of_week)
337 .bind(&pattern.protocol)
338 .bind(&pattern.workspace_id)
339 .bind(&pattern.environment)
340 .bind(pattern.request_count)
341 .bind(pattern.error_count)
342 .bind(pattern.avg_latency_ms)
343 .bind(pattern.unique_clients)
344 .execute(&self.pool)
345 .await?;
346
347 Ok(())
348 }
349
350 pub async fn insert_snapshot(&self, snapshot: &AnalyticsSnapshot) -> Result<i64> {
352 let result = sqlx::query(
353 r"
354 INSERT INTO analytics_snapshots (
355 timestamp, snapshot_type, total_requests, total_errors, avg_latency_ms,
356 active_connections, protocol_stats, top_endpoints,
357 memory_usage_bytes, cpu_usage_percent, thread_count, uptime_seconds,
358 workspace_id, environment
359 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
360 ",
361 )
362 .bind(snapshot.timestamp)
363 .bind(&snapshot.snapshot_type)
364 .bind(snapshot.total_requests)
365 .bind(snapshot.total_errors)
366 .bind(snapshot.avg_latency_ms)
367 .bind(snapshot.active_connections)
368 .bind(&snapshot.protocol_stats)
369 .bind(&snapshot.top_endpoints)
370 .bind(snapshot.memory_usage_bytes)
371 .bind(snapshot.cpu_usage_percent)
372 .bind(snapshot.thread_count)
373 .bind(snapshot.uptime_seconds)
374 .bind(&snapshot.workspace_id)
375 .bind(&snapshot.environment)
376 .execute(&self.pool)
377 .await?;
378
379 Ok(result.last_insert_rowid())
380 }
381
382 pub async fn get_minute_aggregates(
388 &self,
389 filter: &AnalyticsFilter,
390 ) -> Result<Vec<MetricsAggregate>> {
391 let mut query = String::from("SELECT * FROM metrics_aggregates_minute WHERE 1=1");
392
393 if filter.start_time.is_some() {
394 query.push_str(" AND timestamp >= ?");
395 }
396 if filter.end_time.is_some() {
397 query.push_str(" AND timestamp <= ?");
398 }
399 if filter.protocol.is_some() {
400 query.push_str(" AND protocol = ?");
401 }
402 if filter.endpoint.is_some() {
403 query.push_str(" AND endpoint = ?");
404 }
405 if filter.method.is_some() {
406 query.push_str(" AND method = ?");
407 }
408 if filter.status_code.is_some() {
409 query.push_str(" AND status_code = ?");
410 }
411 if filter.workspace_id.is_some() {
412 query.push_str(" AND workspace_id = ?");
413 }
414 if filter.environment.is_some() {
415 query.push_str(" AND environment = ?");
416 }
417
418 query.push_str(" ORDER BY timestamp DESC");
419
420 if filter.limit.is_some() {
421 query.push_str(" LIMIT ?");
422 }
423
424 let mut sql_query = sqlx::query_as::<_, MetricsAggregate>(&query);
426
427 if let Some(start) = filter.start_time {
428 sql_query = sql_query.bind(start);
429 }
430 if let Some(end) = filter.end_time {
431 sql_query = sql_query.bind(end);
432 }
433 if let Some(ref protocol) = filter.protocol {
434 sql_query = sql_query.bind(protocol);
435 }
436 if let Some(ref endpoint) = filter.endpoint {
437 sql_query = sql_query.bind(endpoint);
438 }
439 if let Some(ref method) = filter.method {
440 sql_query = sql_query.bind(method);
441 }
442 if let Some(status) = filter.status_code {
443 sql_query = sql_query.bind(status);
444 }
445 if let Some(ref workspace) = filter.workspace_id {
446 sql_query = sql_query.bind(workspace);
447 }
448 if let Some(ref env) = filter.environment {
449 sql_query = sql_query.bind(env);
450 }
451 if let Some(limit) = filter.limit {
452 sql_query = sql_query.bind(limit);
453 }
454
455 let results = sql_query.fetch_all(&self.pool).await?;
456
457 Ok(results)
458 }
459
460 pub async fn get_top_endpoints(
462 &self,
463 limit: i64,
464 workspace_id: Option<&str>,
465 ) -> Result<Vec<EndpointStats>> {
466 let mut query = String::from("SELECT * FROM endpoint_stats WHERE 1=1");
467
468 if workspace_id.is_some() {
469 query.push_str(" AND workspace_id = ?");
470 }
471
472 query.push_str(" ORDER BY total_requests DESC LIMIT ?");
473
474 let mut sql_query = sqlx::query_as::<_, EndpointStats>(&query);
475
476 if let Some(workspace) = workspace_id {
477 sql_query = sql_query.bind(workspace);
478 }
479
480 sql_query = sql_query.bind(limit);
481
482 let results = sql_query.fetch_all(&self.pool).await?;
483
484 Ok(results)
485 }
486
487 pub async fn get_recent_errors(
489 &self,
490 limit: i64,
491 filter: &AnalyticsFilter,
492 ) -> Result<Vec<ErrorEvent>> {
493 let mut query = String::from("SELECT * FROM error_events WHERE 1=1");
494
495 if filter.start_time.is_some() {
496 query.push_str(" AND timestamp >= ?");
497 }
498 if filter.end_time.is_some() {
499 query.push_str(" AND timestamp <= ?");
500 }
501 if filter.endpoint.is_some() {
502 query.push_str(" AND endpoint = ?");
503 }
504 if filter.workspace_id.is_some() {
505 query.push_str(" AND workspace_id = ?");
506 }
507
508 query.push_str(" ORDER BY timestamp DESC LIMIT ?");
509
510 let mut sql_query = sqlx::query_as::<_, ErrorEvent>(&query);
511
512 if let Some(start) = filter.start_time {
513 sql_query = sql_query.bind(start);
514 }
515 if let Some(end) = filter.end_time {
516 sql_query = sql_query.bind(end);
517 }
518 if let Some(ref endpoint) = filter.endpoint {
519 sql_query = sql_query.bind(endpoint);
520 }
521 if let Some(ref workspace) = filter.workspace_id {
522 sql_query = sql_query.bind(workspace);
523 }
524
525 sql_query = sql_query.bind(limit);
526
527 let results = sql_query.fetch_all(&self.pool).await?;
528
529 Ok(results)
530 }
531
532 pub async fn get_traffic_patterns(
534 &self,
535 days: i64,
536 workspace_id: Option<&str>,
537 ) -> Result<Vec<TrafficPattern>> {
538 let start_date = chrono::Utc::now() - chrono::Duration::days(days);
539 let start_date_str = start_date.format("%Y-%m-%d").to_string();
540
541 let mut query = String::from("SELECT * FROM traffic_patterns WHERE date >= ?");
542
543 if let Some(_workspace) = workspace_id {
544 query.push_str(" AND workspace_id = ?");
545 }
546
547 query.push_str(" ORDER BY date ASC, hour ASC");
548
549 let mut query_builder = sqlx::query_as::<_, TrafficPattern>(&query).bind(start_date_str);
550
551 if let Some(workspace) = workspace_id {
552 query_builder = query_builder.bind(workspace);
553 }
554
555 let results = query_builder.fetch_all(&self.pool).await?;
556
557 Ok(results)
558 }
559
560 pub async fn cleanup_minute_aggregates(&self, days: u32) -> Result<u64> {
566 let cutoff = chrono::Utc::now().timestamp() - (days as i64 * 86400);
567
568 let result = sqlx::query("DELETE FROM metrics_aggregates_minute WHERE timestamp < ?")
569 .bind(cutoff)
570 .execute(&self.pool)
571 .await?;
572
573 info!(
574 "Cleaned up {} minute aggregates older than {} days",
575 result.rows_affected(),
576 days
577 );
578 Ok(result.rows_affected())
579 }
580
581 pub async fn cleanup_hour_aggregates(&self, days: u32) -> Result<u64> {
583 let cutoff = chrono::Utc::now().timestamp() - (days as i64 * 86400);
584
585 let result = sqlx::query("DELETE FROM metrics_aggregates_hour WHERE timestamp < ?")
586 .bind(cutoff)
587 .execute(&self.pool)
588 .await?;
589
590 info!("Cleaned up {} hour aggregates older than {} days", result.rows_affected(), days);
591 Ok(result.rows_affected())
592 }
593
594 pub async fn cleanup_error_events(&self, days: u32) -> Result<u64> {
596 let cutoff = chrono::Utc::now().timestamp() - (days as i64 * 86400);
597
598 let result = sqlx::query("DELETE FROM error_events WHERE timestamp < ?")
599 .bind(cutoff)
600 .execute(&self.pool)
601 .await?;
602
603 info!("Cleaned up {} error events older than {} days", result.rows_affected(), days);
604 Ok(result.rows_affected())
605 }
606
607 pub async fn vacuum(&self) -> Result<()> {
609 info!("Running VACUUM on analytics database");
610 sqlx::query("VACUUM").execute(&self.pool).await?;
611 info!("VACUUM completed");
612 Ok(())
613 }
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 #[tokio::test]
621 async fn test_database_creation() {
622 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
623 db.run_migrations().await.unwrap();
624 }
625
626 #[tokio::test]
627 async fn test_insert_minute_aggregate() {
628 let db = AnalyticsDatabase::new(Path::new(":memory:")).await.unwrap();
629 db.run_migrations().await.unwrap();
630
631 let agg = MetricsAggregate {
632 id: None,
633 timestamp: chrono::Utc::now().timestamp(),
634 protocol: "HTTP".to_string(),
635 method: Some("GET".to_string()),
636 endpoint: Some("/api/test".to_string()),
637 status_code: Some(200),
638 workspace_id: None,
639 environment: None,
640 request_count: 100,
641 error_count: 5,
642 latency_sum: 500.0,
643 latency_min: Some(10.0),
644 latency_max: Some(100.0),
645 latency_p50: Some(45.0),
646 latency_p95: Some(95.0),
647 latency_p99: Some(99.0),
648 bytes_sent: 10000,
649 bytes_received: 5000,
650 active_connections: Some(10),
651 created_at: None,
652 };
653
654 let id = db.insert_minute_aggregate(&agg).await.unwrap();
655 assert!(id > 0);
656 }
657}