1use crate::error::Result;
34use chrono::{DateTime, Utc};
35use serde::{Deserialize, Serialize};
36use sqlx::PgPool;
37use std::collections::{HashMap, VecDeque};
38use std::sync::{Arc, Mutex};
39use std::time::Duration;
40use tracing::{debug, info};
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SizeMonitorConfig {
45 pub warning_threshold_gb: f64,
47
48 pub critical_threshold_gb: f64,
50
51 pub rapid_growth_threshold: f64,
53
54 pub collection_interval: Duration,
56
57 pub max_history_points: usize,
59}
60
61impl Default for SizeMonitorConfig {
62 fn default() -> Self {
63 Self {
64 warning_threshold_gb: 100.0,
65 critical_threshold_gb: 150.0,
66 rapid_growth_threshold: 0.2, collection_interval: Duration::from_secs(3600), max_history_points: 1000,
69 }
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct TableSize {
76 pub table_name: String,
78
79 pub table_bytes: i64,
81
82 pub indexes_bytes: i64,
84
85 pub total_bytes: i64,
87
88 pub row_count: i64,
90
91 pub avg_row_size: i64,
93
94 pub measured_at: DateTime<Utc>,
96}
97
98impl TableSize {
99 pub fn table_gb(&self) -> f64 {
101 self.table_bytes as f64 / 1_073_741_824.0
102 }
103
104 pub fn total_gb(&self) -> f64 {
106 self.total_bytes as f64 / 1_073_741_824.0
107 }
108
109 pub fn table_size_formatted(&self) -> String {
111 format_bytes(self.table_bytes)
112 }
113
114 pub fn total_size_formatted(&self) -> String {
116 format_bytes(self.total_bytes)
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct DatabaseSnapshot {
123 pub timestamp: DateTime<Utc>,
125
126 pub total_size_bytes: i64,
128
129 pub tables: Vec<TableSize>,
131
132 pub table_count: usize,
134}
135
136impl DatabaseSnapshot {
137 pub fn total_gb(&self) -> f64 {
139 self.total_size_bytes as f64 / 1_073_741_824.0
140 }
141
142 pub fn largest_tables(&self, limit: usize) -> Vec<TableSize> {
144 let mut tables = self.tables.clone();
145 tables.sort_by(|a, b| b.total_bytes.cmp(&a.total_bytes));
146 tables.truncate(limit);
147 tables
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct GrowthStats {
154 pub table_name: String,
156
157 pub current_size_bytes: i64,
159
160 pub initial_size_bytes: i64,
162
163 pub growth_bytes: i64,
165
166 pub growth_rate: f64,
168
169 pub avg_daily_growth_bytes: i64,
171
172 pub days_until_critical: Option<f64>,
174}
175
176#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct SizeAlert {
179 pub alert_type: SizeAlertType,
181
182 pub message: String,
184
185 pub current_value: f64,
187
188 pub threshold: f64,
190
191 pub affected_table: Option<String>,
193
194 pub triggered_at: DateTime<Utc>,
196
197 pub recommendation: String,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
203pub enum SizeAlertType {
204 DatabaseCapacity,
206
207 RapidGrowth,
209
210 LargeTable,
212
213 IndexBloat,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct SizeReport {
220 pub generated_at: DateTime<Utc>,
222
223 pub current_snapshot: DatabaseSnapshot,
225
226 pub growth_stats: Vec<GrowthStats>,
228
229 pub alerts: Vec<SizeAlert>,
231
232 pub forecast: CapacityForecast,
234}
235
236#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct CapacityForecast {
239 pub current_size_gb: f64,
241
242 pub projected_30d_gb: f64,
244
245 pub projected_90d_gb: f64,
247
248 pub avg_growth_gb_per_day: f64,
250
251 pub days_until_warning: Option<f64>,
253
254 pub days_until_critical: Option<f64>,
256}
257
258pub struct DatabaseSizeMonitor {
260 config: SizeMonitorConfig,
261 history: Arc<Mutex<VecDeque<DatabaseSnapshot>>>,
262 table_history: Arc<Mutex<HashMap<String, VecDeque<TableSize>>>>,
263}
264
265impl DatabaseSizeMonitor {
266 pub fn new(config: SizeMonitorConfig) -> Self {
268 Self {
269 config,
270 history: Arc::new(Mutex::new(VecDeque::new())),
271 table_history: Arc::new(Mutex::new(HashMap::new())),
272 }
273 }
274
275 pub fn with_defaults() -> Self {
277 Self::new(SizeMonitorConfig::default())
278 }
279
280 pub async fn collect_metrics(&self, pool: &PgPool) -> Result<DatabaseSnapshot> {
282 info!("Collecting database size metrics");
283
284 let total_size_bytes = self.get_database_size(pool).await?;
285 let tables = self.get_table_sizes(pool).await?;
286
287 let snapshot = DatabaseSnapshot {
288 timestamp: Utc::now(),
289 total_size_bytes,
290 table_count: tables.len(),
291 tables,
292 };
293
294 if let Ok(mut history) = self.history.lock() {
296 history.push_back(snapshot.clone());
297 while history.len() > self.config.max_history_points {
298 history.pop_front();
299 }
300 }
301
302 if let Ok(mut table_history) = self.table_history.lock() {
304 for table in &snapshot.tables {
305 let entry = table_history
306 .entry(table.table_name.clone())
307 .or_insert_with(VecDeque::new);
308 entry.push_back(table.clone());
309 while entry.len() > self.config.max_history_points {
310 entry.pop_front();
311 }
312 }
313 }
314
315 debug!(
316 size_gb = snapshot.total_gb(),
317 tables = snapshot.table_count,
318 "Collected size metrics"
319 );
320
321 Ok(snapshot)
322 }
323
324 pub async fn generate_report(&self, pool: &PgPool) -> Result<SizeReport> {
326 let current_snapshot = self.collect_metrics(pool).await?;
327 let growth_stats = self.calculate_growth_stats();
328 let alerts = self.check_alerts(¤t_snapshot, &growth_stats);
329 let forecast = self.generate_forecast(¤t_snapshot);
330
331 info!(
332 size_gb = current_snapshot.total_gb(),
333 alerts = alerts.len(),
334 "Generated size report"
335 );
336
337 Ok(SizeReport {
338 generated_at: Utc::now(),
339 current_snapshot,
340 growth_stats,
341 alerts,
342 forecast,
343 })
344 }
345
346 async fn get_database_size(&self, pool: &PgPool) -> Result<i64> {
348 let size = sqlx::query_scalar::<_, i64>("SELECT pg_database_size(current_database())")
349 .fetch_one(pool)
350 .await?;
351
352 Ok(size)
353 }
354
355 async fn get_table_sizes(&self, pool: &PgPool) -> Result<Vec<TableSize>> {
357 let query = r#"
358 SELECT
359 schemaname || '.' || tablename as table_name,
360 pg_table_size(schemaname || '.' || tablename) as table_bytes,
361 pg_indexes_size(schemaname || '.' || tablename) as indexes_bytes,
362 pg_total_relation_size(schemaname || '.' || tablename) as total_bytes,
363 COALESCE(n_live_tup, 0) as row_count
364 FROM pg_tables
365 LEFT JOIN pg_stat_user_tables ON
366 pg_tables.schemaname = pg_stat_user_tables.schemaname AND
367 pg_tables.tablename = pg_stat_user_tables.relname
368 WHERE pg_tables.schemaname NOT IN ('pg_catalog', 'information_schema')
369 ORDER BY total_bytes DESC
370 "#;
371
372 let rows = sqlx::query_as::<_, (String, i64, i64, i64, i64)>(query)
373 .fetch_all(pool)
374 .await?;
375
376 let tables = rows
377 .into_iter()
378 .map(
379 |(table_name, table_bytes, indexes_bytes, total_bytes, row_count)| {
380 let avg_row_size = if row_count > 0 {
381 table_bytes / row_count
382 } else {
383 0
384 };
385
386 TableSize {
387 table_name,
388 table_bytes,
389 indexes_bytes,
390 total_bytes,
391 row_count,
392 avg_row_size,
393 measured_at: Utc::now(),
394 }
395 },
396 )
397 .collect();
398
399 Ok(tables)
400 }
401
402 fn calculate_growth_stats(&self) -> Vec<GrowthStats> {
404 let table_history = match self.table_history.lock() {
405 Ok(hist) => hist,
406 Err(_) => return Vec::new(),
407 };
408
409 let mut stats = Vec::new();
410
411 for (table_name, history) in table_history.iter() {
412 if history.len() < 2 {
413 continue;
414 }
415
416 let first = &history[0];
417 let last = history.back().unwrap();
418
419 let growth_bytes = last.total_bytes - first.total_bytes;
420 let growth_rate = if first.total_bytes > 0 {
421 growth_bytes as f64 / first.total_bytes as f64
422 } else {
423 0.0
424 };
425
426 let time_diff = last.measured_at.signed_duration_since(first.measured_at);
427 let days = time_diff.num_seconds() as f64 / 86400.0;
428
429 let avg_daily_growth_bytes = if days > 0.0 {
430 (growth_bytes as f64 / days) as i64
431 } else {
432 0
433 };
434
435 stats.push(GrowthStats {
436 table_name: table_name.clone(),
437 current_size_bytes: last.total_bytes,
438 initial_size_bytes: first.total_bytes,
439 growth_bytes,
440 growth_rate,
441 avg_daily_growth_bytes,
442 days_until_critical: None,
443 });
444 }
445
446 stats.sort_by(|a, b| b.growth_rate.partial_cmp(&a.growth_rate).unwrap());
447
448 stats
449 }
450
451 fn check_alerts(
453 &self,
454 snapshot: &DatabaseSnapshot,
455 growth_stats: &[GrowthStats],
456 ) -> Vec<SizeAlert> {
457 let mut alerts = Vec::new();
458
459 let size_gb = snapshot.total_gb();
460
461 if size_gb >= self.config.critical_threshold_gb {
463 alerts.push(SizeAlert {
464 alert_type: SizeAlertType::DatabaseCapacity,
465 message: "Database size exceeds critical threshold".to_string(),
466 current_value: size_gb,
467 threshold: self.config.critical_threshold_gb,
468 affected_table: None,
469 triggered_at: Utc::now(),
470 recommendation: "Urgent: Archive old data or increase storage capacity".to_string(),
471 });
472 } else if size_gb >= self.config.warning_threshold_gb {
473 alerts.push(SizeAlert {
474 alert_type: SizeAlertType::DatabaseCapacity,
475 message: "Database size exceeds warning threshold".to_string(),
476 current_value: size_gb,
477 threshold: self.config.warning_threshold_gb,
478 affected_table: None,
479 triggered_at: Utc::now(),
480 recommendation: "Plan for storage expansion or data archival".to_string(),
481 });
482 }
483
484 for stat in growth_stats {
486 if stat.growth_rate >= self.config.rapid_growth_threshold {
487 alerts.push(SizeAlert {
488 alert_type: SizeAlertType::RapidGrowth,
489 message: format!("Table '{}' is growing rapidly", stat.table_name),
490 current_value: stat.growth_rate,
491 threshold: self.config.rapid_growth_threshold,
492 affected_table: Some(stat.table_name.clone()),
493 triggered_at: Utc::now(),
494 recommendation: "Investigate data retention policy for this table".to_string(),
495 });
496 }
497 }
498
499 alerts
500 }
501
502 fn generate_forecast(&self, snapshot: &DatabaseSnapshot) -> CapacityForecast {
504 let history = match self.history.lock() {
505 Ok(hist) => hist.iter().cloned().collect::<Vec<_>>(),
506 Err(_) => Vec::new(),
507 };
508
509 let current_size_gb = snapshot.total_gb();
510
511 let avg_growth_gb_per_day = if history.len() >= 2 {
512 let first = &history[0];
513 let last = history.last().unwrap();
514
515 let growth_bytes = last.total_size_bytes - first.total_size_bytes;
516 let time_diff = last.timestamp.signed_duration_since(first.timestamp);
517 let days = time_diff.num_seconds() as f64 / 86400.0;
518
519 if days > 0.0 {
520 (growth_bytes as f64 / days) / 1_073_741_824.0
521 } else {
522 0.0
523 }
524 } else {
525 0.0
526 };
527
528 let projected_30d_gb = current_size_gb + (avg_growth_gb_per_day * 30.0);
529 let projected_90d_gb = current_size_gb + (avg_growth_gb_per_day * 90.0);
530
531 let days_until_warning = if avg_growth_gb_per_day > 0.0 {
532 let remaining = self.config.warning_threshold_gb - current_size_gb;
533 if remaining > 0.0 {
534 Some(remaining / avg_growth_gb_per_day)
535 } else {
536 Some(0.0)
537 }
538 } else {
539 None
540 };
541
542 let days_until_critical = if avg_growth_gb_per_day > 0.0 {
543 let remaining = self.config.critical_threshold_gb - current_size_gb;
544 if remaining > 0.0 {
545 Some(remaining / avg_growth_gb_per_day)
546 } else {
547 Some(0.0)
548 }
549 } else {
550 None
551 };
552
553 CapacityForecast {
554 current_size_gb,
555 projected_30d_gb,
556 projected_90d_gb,
557 avg_growth_gb_per_day,
558 days_until_warning,
559 days_until_critical,
560 }
561 }
562
563 pub fn get_history(&self) -> Vec<DatabaseSnapshot> {
565 self.history
566 .lock()
567 .ok()
568 .map(|h| h.iter().cloned().collect())
569 .unwrap_or_default()
570 }
571
572 pub fn clear_history(&self) {
574 if let Ok(mut history) = self.history.lock() {
575 history.clear();
576 }
577 if let Ok(mut table_history) = self.table_history.lock() {
578 table_history.clear();
579 }
580 }
581}
582
583fn format_bytes(bytes: i64) -> String {
585 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
586 let mut size = bytes as f64;
587 let mut unit_idx = 0;
588
589 while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
590 size /= 1024.0;
591 unit_idx += 1;
592 }
593
594 format!("{:.2} {}", size, UNITS[unit_idx])
595}
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600
601 #[test]
602 fn test_size_monitor_config_default() {
603 let config = SizeMonitorConfig::default();
604 assert_eq!(config.warning_threshold_gb, 100.0);
605 assert_eq!(config.critical_threshold_gb, 150.0);
606 assert_eq!(config.rapid_growth_threshold, 0.2);
607 }
608
609 #[test]
610 fn test_format_bytes() {
611 assert_eq!(format_bytes(1024), "1.00 KB");
612 assert_eq!(format_bytes(1_048_576), "1.00 MB");
613 assert_eq!(format_bytes(1_073_741_824), "1.00 GB");
614 }
615
616 #[test]
617 fn test_table_size_formatting() {
618 let table_size = TableSize {
619 table_name: "users".to_string(),
620 table_bytes: 1_073_741_824,
621 indexes_bytes: 536_870_912,
622 total_bytes: 1_610_612_736,
623 row_count: 1000,
624 avg_row_size: 1_073_741,
625 measured_at: Utc::now(),
626 };
627
628 assert_eq!(table_size.table_gb(), 1.0);
629 assert_eq!(table_size.table_size_formatted(), "1.00 GB");
630 }
631
632 #[test]
633 fn test_database_snapshot_largest_tables() {
634 let snapshot = DatabaseSnapshot {
635 timestamp: Utc::now(),
636 total_size_bytes: 10_737_418_240,
637 table_count: 3,
638 tables: vec![
639 TableSize {
640 table_name: "small".to_string(),
641 table_bytes: 1_048_576,
642 indexes_bytes: 0,
643 total_bytes: 1_048_576,
644 row_count: 10,
645 avg_row_size: 104_857,
646 measured_at: Utc::now(),
647 },
648 TableSize {
649 table_name: "large".to_string(),
650 table_bytes: 5_368_709_120,
651 indexes_bytes: 0,
652 total_bytes: 5_368_709_120,
653 row_count: 1000,
654 avg_row_size: 5_368_709,
655 measured_at: Utc::now(),
656 },
657 TableSize {
658 table_name: "medium".to_string(),
659 table_bytes: 1_073_741_824,
660 indexes_bytes: 0,
661 total_bytes: 1_073_741_824,
662 row_count: 100,
663 avg_row_size: 10_737_418,
664 measured_at: Utc::now(),
665 },
666 ],
667 };
668
669 let largest = snapshot.largest_tables(2);
670 assert_eq!(largest.len(), 2);
671 assert_eq!(largest[0].table_name, "large");
672 assert_eq!(largest[1].table_name, "medium");
673 }
674
675 #[test]
676 fn test_growth_stats_serialization() {
677 let stats = GrowthStats {
678 table_name: "users".to_string(),
679 current_size_bytes: 2_147_483_648,
680 initial_size_bytes: 1_073_741_824,
681 growth_bytes: 1_073_741_824,
682 growth_rate: 1.0,
683 avg_daily_growth_bytes: 10_737_418,
684 days_until_critical: Some(100.0),
685 };
686
687 let json = serde_json::to_string(&stats).unwrap();
688 assert!(json.contains("users"));
689 assert!(json.contains("growth_rate"));
690 }
691
692 #[test]
693 fn test_size_alert_serialization() {
694 let alert = SizeAlert {
695 alert_type: SizeAlertType::DatabaseCapacity,
696 message: "Database is full".to_string(),
697 current_value: 150.0,
698 threshold: 100.0,
699 affected_table: None,
700 triggered_at: Utc::now(),
701 recommendation: "Archive data".to_string(),
702 };
703
704 let json = serde_json::to_string(&alert).unwrap();
705 assert!(json.contains("DatabaseCapacity"));
706 }
707
708 #[test]
709 fn test_capacity_forecast_serialization() {
710 let forecast = CapacityForecast {
711 current_size_gb: 80.0,
712 projected_30d_gb: 90.0,
713 projected_90d_gb: 110.0,
714 avg_growth_gb_per_day: 0.5,
715 days_until_warning: Some(40.0),
716 days_until_critical: Some(140.0),
717 };
718
719 let json = serde_json::to_string(&forecast).unwrap();
720 assert!(json.contains("projected_30d_gb"));
721 }
722
723 #[test]
724 fn test_monitor_with_defaults() {
725 let monitor = DatabaseSizeMonitor::with_defaults();
726 assert_eq!(monitor.get_history().len(), 0);
727 }
728
729 #[test]
730 fn test_monitor_clear_history() {
731 let monitor = DatabaseSizeMonitor::with_defaults();
732 monitor.clear_history();
733 assert_eq!(monitor.get_history().len(), 0);
734 }
735}