1use crate::error::{DbError, Result};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use sqlx::PgPool;
7use std::path::{Path, PathBuf};
8use tokio::fs;
9use tokio::process::Command;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct BackupConfig {
14 pub backup_dir: PathBuf,
16 pub database_name: String,
18 pub retention_days: u32,
20 pub max_backups: usize,
22 pub compress: bool,
24}
25
26impl Default for BackupConfig {
27 fn default() -> Self {
28 Self {
29 backup_dir: PathBuf::from("./backups"),
30 database_name: "kaccy".to_string(),
31 retention_days: 30,
32 max_backups: 10,
33 compress: true,
34 }
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BackupMetadata {
41 pub file_path: PathBuf,
43 pub created_at: DateTime<Utc>,
45 pub database_name: String,
47 pub size_bytes: u64,
49 pub compressed: bool,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct PitrConfig {
56 pub wal_archive_dir: PathBuf,
58 pub base_backup_dir: PathBuf,
60 pub retention_days: u32,
62}
63
64impl Default for PitrConfig {
65 fn default() -> Self {
66 Self {
67 wal_archive_dir: PathBuf::from("./wal_archive"),
68 base_backup_dir: PathBuf::from("./base_backups"),
69 retention_days: 7,
70 }
71 }
72}
73
74pub struct BackupManager {
76 config: BackupConfig,
77}
78
79impl BackupManager {
80 pub fn new(config: BackupConfig) -> Self {
82 Self { config }
83 }
84
85 async fn ensure_backup_dir(&self) -> Result<()> {
87 if !self.config.backup_dir.exists() {
88 fs::create_dir_all(&self.config.backup_dir)
89 .await
90 .map_err(|e| DbError::Other(format!("Failed to create backup directory: {}", e)))?;
91 }
92 Ok(())
93 }
94
95 fn generate_backup_filename(&self) -> String {
97 let timestamp = Utc::now().format("%Y%m%d_%H%M%S");
98 let extension = if self.config.compress {
99 "sql.gz"
100 } else {
101 "sql"
102 };
103 format!("{}_{}.{}", self.config.database_name, timestamp, extension)
104 }
105
106 pub async fn create_backup(&self, database_url: &str) -> Result<BackupMetadata> {
108 self.ensure_backup_dir().await?;
109
110 let filename = self.generate_backup_filename();
111 let file_path = self.config.backup_dir.join(&filename);
112
113 let mut cmd = Command::new("pg_dump");
115 cmd.arg(database_url)
116 .arg("--format=plain")
117 .arg("--no-owner")
118 .arg("--no-acl");
119
120 if self.config.compress {
121 let output = cmd
123 .output()
124 .await
125 .map_err(|e| DbError::Other(format!("Failed to execute pg_dump: {}", e)))?;
126
127 if !output.status.success() {
128 let stderr = String::from_utf8_lossy(&output.stderr);
129 return Err(DbError::Other(format!("pg_dump failed: {}", stderr)));
130 }
131
132 let mut gzip_cmd = Command::new("gzip");
134 gzip_cmd
135 .arg("-c")
136 .stdin(std::process::Stdio::piped())
137 .stdout(std::process::Stdio::piped());
138
139 let mut child = gzip_cmd
140 .spawn()
141 .map_err(|e| DbError::Other(format!("Failed to spawn gzip: {}", e)))?;
142
143 if let Some(mut stdin) = child.stdin.take() {
144 use tokio::io::AsyncWriteExt;
145 stdin
146 .write_all(&output.stdout)
147 .await
148 .map_err(|e| DbError::Other(format!("Failed to write to gzip: {}", e)))?;
149 }
150
151 let output = child
152 .wait_with_output()
153 .await
154 .map_err(|e| DbError::Other(format!("Failed to wait for gzip: {}", e)))?;
155
156 fs::write(&file_path, output.stdout)
157 .await
158 .map_err(|e| DbError::Other(format!("Failed to write backup file: {}", e)))?;
159 } else {
160 cmd.arg(format!("--file={}", file_path.display()));
161
162 let output = cmd
163 .output()
164 .await
165 .map_err(|e| DbError::Other(format!("Failed to execute pg_dump: {}", e)))?;
166
167 if !output.status.success() {
168 let stderr = String::from_utf8_lossy(&output.stderr);
169 return Err(DbError::Other(format!("pg_dump failed: {}", stderr)));
170 }
171 }
172
173 let metadata = fs::metadata(&file_path)
174 .await
175 .map_err(|e| DbError::Other(format!("Failed to read backup metadata: {}", e)))?;
176
177 Ok(BackupMetadata {
178 file_path,
179 created_at: Utc::now(),
180 database_name: self.config.database_name.clone(),
181 size_bytes: metadata.len(),
182 compressed: self.config.compress,
183 })
184 }
185
186 pub async fn list_backups(&self) -> Result<Vec<BackupMetadata>> {
188 if !self.config.backup_dir.exists() {
189 return Ok(Vec::new());
190 }
191
192 let mut backups = Vec::new();
193 let mut entries = fs::read_dir(&self.config.backup_dir)
194 .await
195 .map_err(|e| DbError::Other(format!("Failed to read backup directory: {}", e)))?;
196
197 while let Some(entry) = entries
198 .next_entry()
199 .await
200 .map_err(|e| DbError::Other(format!("Failed to read directory entry: {}", e)))?
201 {
202 let path = entry.path();
203 if path.is_file() {
204 let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
205
206 if file_name.starts_with(&self.config.database_name)
207 && (file_name.ends_with(".sql") || file_name.ends_with(".sql.gz"))
208 {
209 let is_compressed = file_name.ends_with(".gz");
210 let metadata = fs::metadata(&path).await.map_err(|e| {
211 DbError::Other(format!("Failed to read file metadata: {}", e))
212 })?;
213
214 backups.push(BackupMetadata {
215 file_path: path,
216 created_at: metadata
217 .created()
218 .ok()
219 .and_then(|t| {
220 DateTime::from_timestamp(
221 t.duration_since(std::time::UNIX_EPOCH).ok()?.as_secs() as i64,
222 0,
223 )
224 })
225 .unwrap_or_else(Utc::now),
226 database_name: self.config.database_name.clone(),
227 size_bytes: metadata.len(),
228 compressed: is_compressed,
229 });
230 }
231 }
232 }
233
234 backups.sort_by(|a, b| b.created_at.cmp(&a.created_at));
236
237 Ok(backups)
238 }
239
240 pub async fn restore_backup(&self, backup_path: &Path, database_url: &str) -> Result<()> {
242 if !backup_path.exists() {
243 return Err(DbError::Other("Backup file not found".to_string()));
244 }
245
246 let compressed = backup_path
247 .extension()
248 .and_then(|e| e.to_str())
249 .map(|e| e == "gz")
250 .unwrap_or(false);
251
252 if compressed {
253 let mut gunzip_cmd = Command::new("gunzip");
255 gunzip_cmd.arg("-c").arg(backup_path);
256
257 let gunzip_output = gunzip_cmd
258 .output()
259 .await
260 .map_err(|e| DbError::Other(format!("Failed to execute gunzip: {}", e)))?;
261
262 if !gunzip_output.status.success() {
263 let stderr = String::from_utf8_lossy(&gunzip_output.stderr);
264 return Err(DbError::Other(format!("gunzip failed: {}", stderr)));
265 }
266
267 let mut psql_cmd = Command::new("psql");
268 psql_cmd
269 .arg(database_url)
270 .stdin(std::process::Stdio::piped());
271
272 let mut child = psql_cmd
273 .spawn()
274 .map_err(|e| DbError::Other(format!("Failed to spawn psql: {}", e)))?;
275
276 if let Some(mut stdin) = child.stdin.take() {
277 use tokio::io::AsyncWriteExt;
278 stdin
279 .write_all(&gunzip_output.stdout)
280 .await
281 .map_err(|e| DbError::Other(format!("Failed to write to psql: {}", e)))?;
282 }
283
284 let output = child
285 .wait_with_output()
286 .await
287 .map_err(|e| DbError::Other(format!("Failed to wait for psql: {}", e)))?;
288
289 if !output.status.success() {
290 let stderr = String::from_utf8_lossy(&output.stderr);
291 return Err(DbError::Other(format!("psql failed: {}", stderr)));
292 }
293 } else {
294 let mut cmd = Command::new("psql");
296 cmd.arg(database_url).arg("-f").arg(backup_path);
297
298 let output = cmd
299 .output()
300 .await
301 .map_err(|e| DbError::Other(format!("Failed to execute psql: {}", e)))?;
302
303 if !output.status.success() {
304 let stderr = String::from_utf8_lossy(&output.stderr);
305 return Err(DbError::Other(format!("psql failed: {}", stderr)));
306 }
307 }
308
309 Ok(())
310 }
311
312 pub async fn cleanup_old_backups(&self) -> Result<Vec<PathBuf>> {
314 let backups = self.list_backups().await?;
315 let mut deleted = Vec::new();
316
317 let cutoff_date = Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
318
319 for (i, backup) in backups.iter().enumerate() {
320 if backup.created_at < cutoff_date || i >= self.config.max_backups {
322 fs::remove_file(&backup.file_path)
323 .await
324 .map_err(|e| DbError::Other(format!("Failed to delete backup: {}", e)))?;
325 deleted.push(backup.file_path.clone());
326 }
327 }
328
329 Ok(deleted)
330 }
331
332 pub async fn get_total_backup_size(&self) -> Result<u64> {
334 let backups = self.list_backups().await?;
335 Ok(backups.iter().map(|b| b.size_bytes).sum())
336 }
337}
338
339pub async fn verify_database_integrity(pool: &PgPool) -> Result<bool> {
341 let result: (bool,) = sqlx::query_as("SELECT true").fetch_one(pool).await?;
343
344 if !result.0 {
345 return Ok(false);
346 }
347
348 let tables = vec![
350 "users",
351 "tokens",
352 "balances",
353 "orders",
354 "trades",
355 "audit_logs",
356 ];
357 for table in tables {
358 let count: (i64,) = sqlx::query_as(&format!(
359 "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{}'",
360 table
361 ))
362 .fetch_one(pool)
363 .await?;
364
365 if count.0 == 0 {
366 return Ok(false);
367 }
368 }
369
370 Ok(true)
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
375pub struct DatabaseStats {
376 pub database_name: String,
377 pub size_bytes: i64,
378 pub table_count: i64,
379 pub index_count: i64,
380}
381
382pub async fn get_database_stats(pool: &PgPool, db_name: &str) -> Result<DatabaseStats> {
383 let size: (i64,) = sqlx::query_as("SELECT pg_database_size(current_database())")
384 .fetch_one(pool)
385 .await?;
386
387 let table_count: (i64,) = sqlx::query_as(
388 "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public'",
389 )
390 .fetch_one(pool)
391 .await?;
392
393 let index_count: (i64,) =
394 sqlx::query_as("SELECT COUNT(*) FROM pg_indexes WHERE schemaname = 'public'")
395 .fetch_one(pool)
396 .await?;
397
398 Ok(DatabaseStats {
399 database_name: db_name.to_string(),
400 size_bytes: size.0,
401 table_count: table_count.0,
402 index_count: index_count.0,
403 })
404}