kaccy_db/
database_size_monitor.rs

1//! Database Size Monitor
2//!
3//! Tracks database growth over time and provides capacity planning insights.
4//! Monitors table sizes, index sizes, and predicts future storage requirements.
5//!
6//! # Features
7//!
8//! - Track database and table size growth over time
9//! - Predict future storage requirements
10//! - Identify rapidly growing tables
11//! - Monitor index bloat
12//! - Alert on capacity thresholds
13//! - Historical growth rate analysis
14//! - Capacity planning recommendations
15//!
16//! # Example
17//!
18//! ```rust
19//! use kaccy_db::database_size_monitor::{DatabaseSizeMonitor, SizeMonitorConfig};
20//! use std::time::Duration;
21//!
22//! let config = SizeMonitorConfig {
23//!     warning_threshold_gb: 100.0,
24//!     critical_threshold_gb: 150.0,
25//!     rapid_growth_threshold: 0.2, // 20% growth
26//!     collection_interval: Duration::from_secs(3600),
27//!     max_history_points: 1000,
28//! };
29//!
30//! let monitor = DatabaseSizeMonitor::new(config);
31//! ```
32
33use crate::error::Result;
34use chrono::{DateTime, Utc};
35use serde::{Deserialize, Serialize};
36use sqlx::PgPool;
37use std::collections::{HashMap, VecDeque};
38use std::sync::{Arc, Mutex};
39use std::time::Duration;
40use tracing::{debug, info};
41
42/// Configuration for the database size monitor
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SizeMonitorConfig {
45    /// Database size warning threshold (GB)
46    pub warning_threshold_gb: f64,
47
48    /// Database size critical threshold (GB)
49    pub critical_threshold_gb: f64,
50
51    /// Growth rate threshold for rapid growth alert (0.0-1.0)
52    pub rapid_growth_threshold: f64,
53
54    /// How often to collect size metrics
55    pub collection_interval: Duration,
56
57    /// Maximum number of historical data points to keep
58    pub max_history_points: usize,
59}
60
61impl Default for SizeMonitorConfig {
62    fn default() -> Self {
63        Self {
64            warning_threshold_gb: 100.0,
65            critical_threshold_gb: 150.0,
66            rapid_growth_threshold: 0.2,                    // 20%
67            collection_interval: Duration::from_secs(3600), // 1 hour
68            max_history_points: 1000,
69        }
70    }
71}
72
73/// Table size information
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct TableSize {
76    /// Table name (schema.table)
77    pub table_name: String,
78
79    /// Table size in bytes
80    pub table_bytes: i64,
81
82    /// Index size in bytes
83    pub indexes_bytes: i64,
84
85    /// Total size (table + indexes) in bytes
86    pub total_bytes: i64,
87
88    /// Number of rows (estimated)
89    pub row_count: i64,
90
91    /// Average row size in bytes
92    pub avg_row_size: i64,
93
94    /// When this measurement was taken
95    pub measured_at: DateTime<Utc>,
96}
97
98impl TableSize {
99    /// Get table size in GB
100    pub fn table_gb(&self) -> f64 {
101        self.table_bytes as f64 / 1_073_741_824.0
102    }
103
104    /// Get total size in GB
105    pub fn total_gb(&self) -> f64 {
106        self.total_bytes as f64 / 1_073_741_824.0
107    }
108
109    /// Get human-readable table size
110    pub fn table_size_formatted(&self) -> String {
111        format_bytes(self.table_bytes)
112    }
113
114    /// Get human-readable total size
115    pub fn total_size_formatted(&self) -> String {
116        format_bytes(self.total_bytes)
117    }
118}
119
120/// Database size snapshot
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct DatabaseSnapshot {
123    /// When this snapshot was taken
124    pub timestamp: DateTime<Utc>,
125
126    /// Total database size in bytes
127    pub total_size_bytes: i64,
128
129    /// Individual table sizes
130    pub tables: Vec<TableSize>,
131
132    /// Number of tables
133    pub table_count: usize,
134}
135
136impl DatabaseSnapshot {
137    /// Get total size in GB
138    pub fn total_gb(&self) -> f64 {
139        self.total_size_bytes as f64 / 1_073_741_824.0
140    }
141
142    /// Get largest tables (top N)
143    pub fn largest_tables(&self, limit: usize) -> Vec<TableSize> {
144        let mut tables = self.tables.clone();
145        tables.sort_by(|a, b| b.total_bytes.cmp(&a.total_bytes));
146        tables.truncate(limit);
147        tables
148    }
149}
150
151/// Growth statistics for a table
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct GrowthStats {
154    /// Table name
155    pub table_name: String,
156
157    /// Current size in bytes
158    pub current_size_bytes: i64,
159
160    /// Size at start of monitoring period
161    pub initial_size_bytes: i64,
162
163    /// Absolute growth in bytes
164    pub growth_bytes: i64,
165
166    /// Relative growth (0.0-1.0, can be > 1.0 for >100% growth)
167    pub growth_rate: f64,
168
169    /// Average growth per day (bytes)
170    pub avg_daily_growth_bytes: i64,
171
172    /// Estimated days until critical threshold
173    pub days_until_critical: Option<f64>,
174}
175
176/// Size alert
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct SizeAlert {
179    /// Alert type
180    pub alert_type: SizeAlertType,
181
182    /// Alert message
183    pub message: String,
184
185    /// Current value
186    pub current_value: f64,
187
188    /// Threshold value
189    pub threshold: f64,
190
191    /// Affected table (if applicable)
192    pub affected_table: Option<String>,
193
194    /// When the alert was triggered
195    pub triggered_at: DateTime<Utc>,
196
197    /// Recommended action
198    pub recommendation: String,
199}
200
201/// Type of size alert
202#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
203pub enum SizeAlertType {
204    /// Database approaching capacity
205    DatabaseCapacity,
206
207    /// Table growing rapidly
208    RapidGrowth,
209
210    /// Large table detected
211    LargeTable,
212
213    /// Index bloat detected
214    IndexBloat,
215}
216
217/// Size monitoring report
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct SizeReport {
220    /// When this report was generated
221    pub generated_at: DateTime<Utc>,
222
223    /// Current database snapshot
224    pub current_snapshot: DatabaseSnapshot,
225
226    /// Growth statistics
227    pub growth_stats: Vec<GrowthStats>,
228
229    /// Active alerts
230    pub alerts: Vec<SizeAlert>,
231
232    /// Capacity forecast
233    pub forecast: CapacityForecast,
234}
235
236/// Capacity forecast
237#[derive(Debug, Clone, Serialize, Deserialize)]
238pub struct CapacityForecast {
239    /// Current database size in GB
240    pub current_size_gb: f64,
241
242    /// Projected size in 30 days (GB)
243    pub projected_30d_gb: f64,
244
245    /// Projected size in 90 days (GB)
246    pub projected_90d_gb: f64,
247
248    /// Average daily growth rate (GB/day)
249    pub avg_growth_gb_per_day: f64,
250
251    /// Days until warning threshold
252    pub days_until_warning: Option<f64>,
253
254    /// Days until critical threshold
255    pub days_until_critical: Option<f64>,
256}
257
258/// Database size monitor
259pub struct DatabaseSizeMonitor {
260    config: SizeMonitorConfig,
261    history: Arc<Mutex<VecDeque<DatabaseSnapshot>>>,
262    table_history: Arc<Mutex<HashMap<String, VecDeque<TableSize>>>>,
263}
264
265impl DatabaseSizeMonitor {
266    /// Create a new database size monitor
267    pub fn new(config: SizeMonitorConfig) -> Self {
268        Self {
269            config,
270            history: Arc::new(Mutex::new(VecDeque::new())),
271            table_history: Arc::new(Mutex::new(HashMap::new())),
272        }
273    }
274
275    /// Create a monitor with default configuration
276    pub fn with_defaults() -> Self {
277        Self::new(SizeMonitorConfig::default())
278    }
279
280    /// Collect current size metrics
281    pub async fn collect_metrics(&self, pool: &PgPool) -> Result<DatabaseSnapshot> {
282        info!("Collecting database size metrics");
283
284        let total_size_bytes = self.get_database_size(pool).await?;
285        let tables = self.get_table_sizes(pool).await?;
286
287        let snapshot = DatabaseSnapshot {
288            timestamp: Utc::now(),
289            total_size_bytes,
290            table_count: tables.len(),
291            tables,
292        };
293
294        // Store in history
295        if let Ok(mut history) = self.history.lock() {
296            history.push_back(snapshot.clone());
297            while history.len() > self.config.max_history_points {
298                history.pop_front();
299            }
300        }
301
302        // Store table history
303        if let Ok(mut table_history) = self.table_history.lock() {
304            for table in &snapshot.tables {
305                let entry = table_history
306                    .entry(table.table_name.clone())
307                    .or_insert_with(VecDeque::new);
308                entry.push_back(table.clone());
309                while entry.len() > self.config.max_history_points {
310                    entry.pop_front();
311                }
312            }
313        }
314
315        debug!(
316            size_gb = snapshot.total_gb(),
317            tables = snapshot.table_count,
318            "Collected size metrics"
319        );
320
321        Ok(snapshot)
322    }
323
324    /// Generate size monitoring report
325    pub async fn generate_report(&self, pool: &PgPool) -> Result<SizeReport> {
326        let current_snapshot = self.collect_metrics(pool).await?;
327        let growth_stats = self.calculate_growth_stats();
328        let alerts = self.check_alerts(&current_snapshot, &growth_stats);
329        let forecast = self.generate_forecast(&current_snapshot);
330
331        info!(
332            size_gb = current_snapshot.total_gb(),
333            alerts = alerts.len(),
334            "Generated size report"
335        );
336
337        Ok(SizeReport {
338            generated_at: Utc::now(),
339            current_snapshot,
340            growth_stats,
341            alerts,
342            forecast,
343        })
344    }
345
346    /// Get database size in bytes
347    async fn get_database_size(&self, pool: &PgPool) -> Result<i64> {
348        let size = sqlx::query_scalar::<_, i64>("SELECT pg_database_size(current_database())")
349            .fetch_one(pool)
350            .await?;
351
352        Ok(size)
353    }
354
355    /// Get table sizes
356    async fn get_table_sizes(&self, pool: &PgPool) -> Result<Vec<TableSize>> {
357        let query = r#"
358            SELECT
359                schemaname || '.' || tablename as table_name,
360                pg_table_size(schemaname || '.' || tablename) as table_bytes,
361                pg_indexes_size(schemaname || '.' || tablename) as indexes_bytes,
362                pg_total_relation_size(schemaname || '.' || tablename) as total_bytes,
363                COALESCE(n_live_tup, 0) as row_count
364            FROM pg_tables
365            LEFT JOIN pg_stat_user_tables ON
366                pg_tables.schemaname = pg_stat_user_tables.schemaname AND
367                pg_tables.tablename = pg_stat_user_tables.relname
368            WHERE pg_tables.schemaname NOT IN ('pg_catalog', 'information_schema')
369            ORDER BY total_bytes DESC
370        "#;
371
372        let rows = sqlx::query_as::<_, (String, i64, i64, i64, i64)>(query)
373            .fetch_all(pool)
374            .await?;
375
376        let tables = rows
377            .into_iter()
378            .map(
379                |(table_name, table_bytes, indexes_bytes, total_bytes, row_count)| {
380                    let avg_row_size = if row_count > 0 {
381                        table_bytes / row_count
382                    } else {
383                        0
384                    };
385
386                    TableSize {
387                        table_name,
388                        table_bytes,
389                        indexes_bytes,
390                        total_bytes,
391                        row_count,
392                        avg_row_size,
393                        measured_at: Utc::now(),
394                    }
395                },
396            )
397            .collect();
398
399        Ok(tables)
400    }
401
402    /// Calculate growth statistics
403    fn calculate_growth_stats(&self) -> Vec<GrowthStats> {
404        let table_history = match self.table_history.lock() {
405            Ok(hist) => hist,
406            Err(_) => return Vec::new(),
407        };
408
409        let mut stats = Vec::new();
410
411        for (table_name, history) in table_history.iter() {
412            if history.len() < 2 {
413                continue;
414            }
415
416            let first = &history[0];
417            let last = history.back().unwrap();
418
419            let growth_bytes = last.total_bytes - first.total_bytes;
420            let growth_rate = if first.total_bytes > 0 {
421                growth_bytes as f64 / first.total_bytes as f64
422            } else {
423                0.0
424            };
425
426            let time_diff = last.measured_at.signed_duration_since(first.measured_at);
427            let days = time_diff.num_seconds() as f64 / 86400.0;
428
429            let avg_daily_growth_bytes = if days > 0.0 {
430                (growth_bytes as f64 / days) as i64
431            } else {
432                0
433            };
434
435            stats.push(GrowthStats {
436                table_name: table_name.clone(),
437                current_size_bytes: last.total_bytes,
438                initial_size_bytes: first.total_bytes,
439                growth_bytes,
440                growth_rate,
441                avg_daily_growth_bytes,
442                days_until_critical: None,
443            });
444        }
445
446        stats.sort_by(|a, b| b.growth_rate.partial_cmp(&a.growth_rate).unwrap());
447
448        stats
449    }
450
451    /// Check for alert conditions
452    fn check_alerts(
453        &self,
454        snapshot: &DatabaseSnapshot,
455        growth_stats: &[GrowthStats],
456    ) -> Vec<SizeAlert> {
457        let mut alerts = Vec::new();
458
459        let size_gb = snapshot.total_gb();
460
461        // Database capacity alerts
462        if size_gb >= self.config.critical_threshold_gb {
463            alerts.push(SizeAlert {
464                alert_type: SizeAlertType::DatabaseCapacity,
465                message: "Database size exceeds critical threshold".to_string(),
466                current_value: size_gb,
467                threshold: self.config.critical_threshold_gb,
468                affected_table: None,
469                triggered_at: Utc::now(),
470                recommendation: "Urgent: Archive old data or increase storage capacity".to_string(),
471            });
472        } else if size_gb >= self.config.warning_threshold_gb {
473            alerts.push(SizeAlert {
474                alert_type: SizeAlertType::DatabaseCapacity,
475                message: "Database size exceeds warning threshold".to_string(),
476                current_value: size_gb,
477                threshold: self.config.warning_threshold_gb,
478                affected_table: None,
479                triggered_at: Utc::now(),
480                recommendation: "Plan for storage expansion or data archival".to_string(),
481            });
482        }
483
484        // Rapid growth alerts
485        for stat in growth_stats {
486            if stat.growth_rate >= self.config.rapid_growth_threshold {
487                alerts.push(SizeAlert {
488                    alert_type: SizeAlertType::RapidGrowth,
489                    message: format!("Table '{}' is growing rapidly", stat.table_name),
490                    current_value: stat.growth_rate,
491                    threshold: self.config.rapid_growth_threshold,
492                    affected_table: Some(stat.table_name.clone()),
493                    triggered_at: Utc::now(),
494                    recommendation: "Investigate data retention policy for this table".to_string(),
495                });
496            }
497        }
498
499        alerts
500    }
501
502    /// Generate capacity forecast
503    fn generate_forecast(&self, snapshot: &DatabaseSnapshot) -> CapacityForecast {
504        let history = match self.history.lock() {
505            Ok(hist) => hist.iter().cloned().collect::<Vec<_>>(),
506            Err(_) => Vec::new(),
507        };
508
509        let current_size_gb = snapshot.total_gb();
510
511        let avg_growth_gb_per_day = if history.len() >= 2 {
512            let first = &history[0];
513            let last = history.last().unwrap();
514
515            let growth_bytes = last.total_size_bytes - first.total_size_bytes;
516            let time_diff = last.timestamp.signed_duration_since(first.timestamp);
517            let days = time_diff.num_seconds() as f64 / 86400.0;
518
519            if days > 0.0 {
520                (growth_bytes as f64 / days) / 1_073_741_824.0
521            } else {
522                0.0
523            }
524        } else {
525            0.0
526        };
527
528        let projected_30d_gb = current_size_gb + (avg_growth_gb_per_day * 30.0);
529        let projected_90d_gb = current_size_gb + (avg_growth_gb_per_day * 90.0);
530
531        let days_until_warning = if avg_growth_gb_per_day > 0.0 {
532            let remaining = self.config.warning_threshold_gb - current_size_gb;
533            if remaining > 0.0 {
534                Some(remaining / avg_growth_gb_per_day)
535            } else {
536                Some(0.0)
537            }
538        } else {
539            None
540        };
541
542        let days_until_critical = if avg_growth_gb_per_day > 0.0 {
543            let remaining = self.config.critical_threshold_gb - current_size_gb;
544            if remaining > 0.0 {
545                Some(remaining / avg_growth_gb_per_day)
546            } else {
547                Some(0.0)
548            }
549        } else {
550            None
551        };
552
553        CapacityForecast {
554            current_size_gb,
555            projected_30d_gb,
556            projected_90d_gb,
557            avg_growth_gb_per_day,
558            days_until_warning,
559            days_until_critical,
560        }
561    }
562
563    /// Get historical snapshots
564    pub fn get_history(&self) -> Vec<DatabaseSnapshot> {
565        self.history
566            .lock()
567            .ok()
568            .map(|h| h.iter().cloned().collect())
569            .unwrap_or_default()
570    }
571
572    /// Clear all historical data
573    pub fn clear_history(&self) {
574        if let Ok(mut history) = self.history.lock() {
575            history.clear();
576        }
577        if let Ok(mut table_history) = self.table_history.lock() {
578            table_history.clear();
579        }
580    }
581}
582
583/// Format bytes to human-readable string
584fn format_bytes(bytes: i64) -> String {
585    const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
586    let mut size = bytes as f64;
587    let mut unit_idx = 0;
588
589    while size >= 1024.0 && unit_idx < UNITS.len() - 1 {
590        size /= 1024.0;
591        unit_idx += 1;
592    }
593
594    format!("{:.2} {}", size, UNITS[unit_idx])
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600
601    #[test]
602    fn test_size_monitor_config_default() {
603        let config = SizeMonitorConfig::default();
604        assert_eq!(config.warning_threshold_gb, 100.0);
605        assert_eq!(config.critical_threshold_gb, 150.0);
606        assert_eq!(config.rapid_growth_threshold, 0.2);
607    }
608
609    #[test]
610    fn test_format_bytes() {
611        assert_eq!(format_bytes(1024), "1.00 KB");
612        assert_eq!(format_bytes(1_048_576), "1.00 MB");
613        assert_eq!(format_bytes(1_073_741_824), "1.00 GB");
614    }
615
616    #[test]
617    fn test_table_size_formatting() {
618        let table_size = TableSize {
619            table_name: "users".to_string(),
620            table_bytes: 1_073_741_824,
621            indexes_bytes: 536_870_912,
622            total_bytes: 1_610_612_736,
623            row_count: 1000,
624            avg_row_size: 1_073_741,
625            measured_at: Utc::now(),
626        };
627
628        assert_eq!(table_size.table_gb(), 1.0);
629        assert_eq!(table_size.table_size_formatted(), "1.00 GB");
630    }
631
632    #[test]
633    fn test_database_snapshot_largest_tables() {
634        let snapshot = DatabaseSnapshot {
635            timestamp: Utc::now(),
636            total_size_bytes: 10_737_418_240,
637            table_count: 3,
638            tables: vec![
639                TableSize {
640                    table_name: "small".to_string(),
641                    table_bytes: 1_048_576,
642                    indexes_bytes: 0,
643                    total_bytes: 1_048_576,
644                    row_count: 10,
645                    avg_row_size: 104_857,
646                    measured_at: Utc::now(),
647                },
648                TableSize {
649                    table_name: "large".to_string(),
650                    table_bytes: 5_368_709_120,
651                    indexes_bytes: 0,
652                    total_bytes: 5_368_709_120,
653                    row_count: 1000,
654                    avg_row_size: 5_368_709,
655                    measured_at: Utc::now(),
656                },
657                TableSize {
658                    table_name: "medium".to_string(),
659                    table_bytes: 1_073_741_824,
660                    indexes_bytes: 0,
661                    total_bytes: 1_073_741_824,
662                    row_count: 100,
663                    avg_row_size: 10_737_418,
664                    measured_at: Utc::now(),
665                },
666            ],
667        };
668
669        let largest = snapshot.largest_tables(2);
670        assert_eq!(largest.len(), 2);
671        assert_eq!(largest[0].table_name, "large");
672        assert_eq!(largest[1].table_name, "medium");
673    }
674
675    #[test]
676    fn test_growth_stats_serialization() {
677        let stats = GrowthStats {
678            table_name: "users".to_string(),
679            current_size_bytes: 2_147_483_648,
680            initial_size_bytes: 1_073_741_824,
681            growth_bytes: 1_073_741_824,
682            growth_rate: 1.0,
683            avg_daily_growth_bytes: 10_737_418,
684            days_until_critical: Some(100.0),
685        };
686
687        let json = serde_json::to_string(&stats).unwrap();
688        assert!(json.contains("users"));
689        assert!(json.contains("growth_rate"));
690    }
691
692    #[test]
693    fn test_size_alert_serialization() {
694        let alert = SizeAlert {
695            alert_type: SizeAlertType::DatabaseCapacity,
696            message: "Database is full".to_string(),
697            current_value: 150.0,
698            threshold: 100.0,
699            affected_table: None,
700            triggered_at: Utc::now(),
701            recommendation: "Archive data".to_string(),
702        };
703
704        let json = serde_json::to_string(&alert).unwrap();
705        assert!(json.contains("DatabaseCapacity"));
706    }
707
708    #[test]
709    fn test_capacity_forecast_serialization() {
710        let forecast = CapacityForecast {
711            current_size_gb: 80.0,
712            projected_30d_gb: 90.0,
713            projected_90d_gb: 110.0,
714            avg_growth_gb_per_day: 0.5,
715            days_until_warning: Some(40.0),
716            days_until_critical: Some(140.0),
717        };
718
719        let json = serde_json::to_string(&forecast).unwrap();
720        assert!(json.contains("projected_30d_gb"));
721    }
722
723    #[test]
724    fn test_monitor_with_defaults() {
725        let monitor = DatabaseSizeMonitor::with_defaults();
726        assert_eq!(monitor.get_history().len(), 0);
727    }
728
729    #[test]
730    fn test_monitor_clear_history() {
731        let monitor = DatabaseSizeMonitor::with_defaults();
732        monitor.clear_history();
733        assert_eq!(monitor.get_history().len(), 0);
734    }
735}