kaccy_db/
backup.rs

1//! Database backup and recovery utilities
2
3use crate::error::{DbError, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use sqlx::PgPool;
7use std::path::{Path, PathBuf};
8use tokio::fs;
9use tokio::process::Command;
10
11/// Backup configuration
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct BackupConfig {
14    /// Directory to store backups
15    pub backup_dir: PathBuf,
16    /// Database name
17    pub database_name: String,
18    /// Retention period in days (backups older than this will be cleaned up)
19    pub retention_days: u32,
20    /// Maximum number of backups to keep
21    pub max_backups: usize,
22    /// Compress backups with gzip
23    pub compress: bool,
24}
25
26impl Default for BackupConfig {
27    fn default() -> Self {
28        Self {
29            backup_dir: PathBuf::from("./backups"),
30            database_name: "kaccy".to_string(),
31            retention_days: 30,
32            max_backups: 10,
33            compress: true,
34        }
35    }
36}
37
38/// Backup metadata
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BackupMetadata {
41    /// Backup file path
42    pub file_path: PathBuf,
43    /// Backup creation timestamp
44    pub created_at: DateTime<Utc>,
45    /// Database name
46    pub database_name: String,
47    /// File size in bytes
48    pub size_bytes: u64,
49    /// Whether the backup is compressed
50    pub compressed: bool,
51}
52
53/// Point-in-time recovery configuration
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct PitrConfig {
56    /// WAL archive directory
57    pub wal_archive_dir: PathBuf,
58    /// Base backup directory
59    pub base_backup_dir: PathBuf,
60    /// Retention period in days
61    pub retention_days: u32,
62}
63
64impl Default for PitrConfig {
65    fn default() -> Self {
66        Self {
67            wal_archive_dir: PathBuf::from("./wal_archive"),
68            base_backup_dir: PathBuf::from("./base_backups"),
69            retention_days: 7,
70        }
71    }
72}
73
74/// Database backup manager
75pub struct BackupManager {
76    config: BackupConfig,
77}
78
79impl BackupManager {
80    /// Create a new backup manager
81    pub fn new(config: BackupConfig) -> Self {
82        Self { config }
83    }
84
85    /// Ensure backup directory exists
86    async fn ensure_backup_dir(&self) -> Result<()> {
87        if !self.config.backup_dir.exists() {
88            fs::create_dir_all(&self.config.backup_dir)
89                .await
90                .map_err(|e| DbError::Other(format!("Failed to create backup directory: {}", e)))?;
91        }
92        Ok(())
93    }
94
95    /// Generate backup filename
96    fn generate_backup_filename(&self) -> String {
97        let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
98        let extension = if self.config.compress {
99            "sql.gz"
100        } else {
101            "sql"
102        };
103        format!("{}_{}.{}", self.config.database_name, timestamp, extension)
104    }
105
106    /// Create a database backup using pg_dump
107    pub async fn create_backup(&self, database_url: &str) -> Result<BackupMetadata> {
108        self.ensure_backup_dir().await?;
109
110        let filename = self.generate_backup_filename();
111        let file_path = self.config.backup_dir.join(&filename);
112
113        // Execute pg_dump command
114        let mut cmd = Command::new("pg_dump");
115        cmd.arg(database_url)
116            .arg("--format=plain")
117            .arg("--no-owner")
118            .arg("--no-acl");
119
120        if self.config.compress {
121            // Use gzip to compress output
122            let output = cmd
123                .output()
124                .await
125                .map_err(|e| DbError::Other(format!("Failed to execute pg_dump: {}", e)))?;
126
127            if !output.status.success() {
128                let stderr = String::from_utf8_lossy(&output.stderr);
129                return Err(DbError::Other(format!("pg_dump failed: {}", stderr)));
130            }
131
132            // Compress the output
133            let mut gzip_cmd = Command::new("gzip");
134            gzip_cmd
135                .arg("-c")
136                .stdin(std::process::Stdio::piped())
137                .stdout(std::process::Stdio::piped());
138
139            let mut child = gzip_cmd
140                .spawn()
141                .map_err(|e| DbError::Other(format!("Failed to spawn gzip: {}", e)))?;
142
143            if let Some(mut stdin) = child.stdin.take() {
144                use tokio::io::AsyncWriteExt;
145                stdin
146                    .write_all(&output.stdout)
147                    .await
148                    .map_err(|e| DbError::Other(format!("Failed to write to gzip: {}", e)))?;
149            }
150
151            let output = child
152                .wait_with_output()
153                .await
154                .map_err(|e| DbError::Other(format!("Failed to wait for gzip: {}", e)))?;
155
156            fs::write(&file_path, output.stdout)
157                .await
158                .map_err(|e| DbError::Other(format!("Failed to write backup file: {}", e)))?;
159        } else {
160            cmd.arg(format!("--file={}", file_path.display()));
161
162            let output = cmd
163                .output()
164                .await
165                .map_err(|e| DbError::Other(format!("Failed to execute pg_dump: {}", e)))?;
166
167            if !output.status.success() {
168                let stderr = String::from_utf8_lossy(&output.stderr);
169                return Err(DbError::Other(format!("pg_dump failed: {}", stderr)));
170            }
171        }
172
173        let metadata = fs::metadata(&file_path)
174            .await
175            .map_err(|e| DbError::Other(format!("Failed to read backup metadata: {}", e)))?;
176
177        Ok(BackupMetadata {
178            file_path,
179            created_at: Utc::now(),
180            database_name: self.config.database_name.clone(),
181            size_bytes: metadata.len(),
182            compressed: self.config.compress,
183        })
184    }
185
186    /// List all available backups
187    pub async fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
188        if !self.config.backup_dir.exists() {
189            return Ok(Vec::new());
190        }
191
192        let mut backups = Vec::new();
193        let mut entries = fs::read_dir(&self.config.backup_dir)
194            .await
195            .map_err(|e| DbError::Other(format!("Failed to read backup directory: {}", e)))?;
196
197        while let Some(entry) = entries
198            .next_entry()
199            .await
200            .map_err(|e| DbError::Other(format!("Failed to read directory entry: {}", e)))?
201        {
202            let path = entry.path();
203            if path.is_file() {
204                let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
205
206                if file_name.starts_with(&self.config.database_name)
207                    && (file_name.ends_with(".sql") || file_name.ends_with(".sql.gz"))
208                {
209                    let is_compressed = file_name.ends_with(".gz");
210                    let metadata = fs::metadata(&path).await.map_err(|e| {
211                        DbError::Other(format!("Failed to read file metadata: {}", e))
212                    })?;
213
214                    backups.push(BackupMetadata {
215                        file_path: path,
216                        created_at: metadata
217                            .created()
218                            .ok()
219                            .and_then(|t| {
220                                DateTime::from_timestamp(
221                                    t.duration_since(std::time::UNIX_EPOCH).ok()?.as_secs() as i64,
222                                    0,
223                                )
224                            })
225                            .unwrap_or_else(Utc::now),
226                        database_name: self.config.database_name.clone(),
227                        size_bytes: metadata.len(),
228                        compressed: is_compressed,
229                    });
230                }
231            }
232        }
233
234        // Sort by creation time, newest first
235        backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
236
237        Ok(backups)
238    }
239
240    /// Restore database from a backup file
241    pub async fn restore_backup(&self, backup_path: &Path, database_url: &str) -> Result<()> {
242        if !backup_path.exists() {
243            return Err(DbError::Other("Backup file not found".to_string()));
244        }
245
246        let compressed = backup_path
247            .extension()
248            .and_then(|e| e.to_str())
249            .map(|e| e == "gz")
250            .unwrap_or(false);
251
252        if compressed {
253            // Decompress and pipe to psql
254            let mut gunzip_cmd = Command::new("gunzip");
255            gunzip_cmd.arg("-c").arg(backup_path);
256
257            let gunzip_output = gunzip_cmd
258                .output()
259                .await
260                .map_err(|e| DbError::Other(format!("Failed to execute gunzip: {}", e)))?;
261
262            if !gunzip_output.status.success() {
263                let stderr = String::from_utf8_lossy(&gunzip_output.stderr);
264                return Err(DbError::Other(format!("gunzip failed: {}", stderr)));
265            }
266
267            let mut psql_cmd = Command::new("psql");
268            psql_cmd
269                .arg(database_url)
270                .stdin(std::process::Stdio::piped());
271
272            let mut child = psql_cmd
273                .spawn()
274                .map_err(|e| DbError::Other(format!("Failed to spawn psql: {}", e)))?;
275
276            if let Some(mut stdin) = child.stdin.take() {
277                use tokio::io::AsyncWriteExt;
278                stdin
279                    .write_all(&gunzip_output.stdout)
280                    .await
281                    .map_err(|e| DbError::Other(format!("Failed to write to psql: {}", e)))?;
282            }
283
284            let output = child
285                .wait_with_output()
286                .await
287                .map_err(|e| DbError::Other(format!("Failed to wait for psql: {}", e)))?;
288
289            if !output.status.success() {
290                let stderr = String::from_utf8_lossy(&output.stderr);
291                return Err(DbError::Other(format!("psql failed: {}", stderr)));
292            }
293        } else {
294            // Direct restore with psql
295            let mut cmd = Command::new("psql");
296            cmd.arg(database_url).arg("-f").arg(backup_path);
297
298            let output = cmd
299                .output()
300                .await
301                .map_err(|e| DbError::Other(format!("Failed to execute psql: {}", e)))?;
302
303            if !output.status.success() {
304                let stderr = String::from_utf8_lossy(&output.stderr);
305                return Err(DbError::Other(format!("psql failed: {}", stderr)));
306            }
307        }
308
309        Ok(())
310    }
311
312    /// Clean up old backups based on retention policy
313    pub async fn cleanup_old_backups(&self) -> Result<Vec<PathBuf>> {
314        let backups = self.list_backups().await?;
315        let mut deleted = Vec::new();
316
317        let cutoff_date = Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
318
319        for (i, backup) in backups.iter().enumerate() {
320            // Delete if older than retention period OR exceeds max backups
321            if backup.created_at < cutoff_date || i >= self.config.max_backups {
322                fs::remove_file(&backup.file_path)
323                    .await
324                    .map_err(|e| DbError::Other(format!("Failed to delete backup: {}", e)))?;
325                deleted.push(backup.file_path.clone());
326            }
327        }
328
329        Ok(deleted)
330    }
331
332    /// Get total size of all backups
333    pub async fn get_total_backup_size(&self) -> Result<u64> {
334        let backups = self.list_backups().await?;
335        Ok(backups.iter().map(|b| b.size_bytes).sum())
336    }
337}
338
339/// Verify database integrity
340pub async fn verify_database_integrity(pool: &PgPool) -> Result<bool> {
341    // Check if database is accessible
342    let result: (bool,) = sqlx::query_as("SELECT true").fetch_one(pool).await?;
343
344    if !result.0 {
345        return Ok(false);
346    }
347
348    // Verify critical tables exist
349    let tables = vec![
350        "users",
351        "tokens",
352        "balances",
353        "orders",
354        "trades",
355        "audit_logs",
356    ];
357    for table in tables {
358        let count: (i64,) = sqlx::query_as(&format!(
359            "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{}'",
360            table
361        ))
362        .fetch_one(pool)
363        .await?;
364
365        if count.0 == 0 {
366            return Ok(false);
367        }
368    }
369
370    Ok(true)
371}
372
373/// Get database statistics
374#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
375pub struct DatabaseStats {
376    pub database_name: String,
377    pub size_bytes: i64,
378    pub table_count: i64,
379    pub index_count: i64,
380}
381
382pub async fn get_database_stats(pool: &PgPool, db_name: &str) -> Result<DatabaseStats> {
383    let size: (i64,) = sqlx::query_as("SELECT pg_database_size(current_database())")
384        .fetch_one(pool)
385        .await?;
386
387    let table_count: (i64,) = sqlx::query_as(
388        "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'",
389    )
390    .fetch_one(pool)
391    .await?;
392
393    let index_count: (i64,) =
394        sqlx::query_as("SELECT COUNT(*) FROM pg_indexes WHERE schemaname = 'public'")
395            .fetch_one(pool)
396            .await?;
397
398    Ok(DatabaseStats {
399        database_name: db_name.to_string(),
400        size_bytes: size.0,
401        table_count: table_count.0,
402        index_count: index_count.0,
403    })
404}