1use super::{BackupConfig, BackupError, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use sqlx::PgPool;
5use std::path::{Path, PathBuf};
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10
11pub struct WalArchiver {
13 config: BackupConfig,
14 db_pool: Arc<PgPool>,
15 archive_status: Arc<tokio::sync::RwLock<ArchiveStatus>>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ArchiveStatus {
20 pub is_archiving: bool,
21 pub last_archived_wal: Option<String>,
22 pub last_archive_time: Option<DateTime<Utc>>,
23 pub archived_wal_count: u64,
24 pub failed_archives: u64,
25 pub archive_lag_seconds: u64,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct WalFile {
30 pub name: String,
31 pub path: PathBuf,
32 pub size_bytes: u64,
33 pub created_time: DateTime<Utc>,
34 pub archived_time: Option<DateTime<Utc>>,
35 pub checksum: String,
36}
37
38impl WalArchiver {
39 pub fn new(config: BackupConfig, db_pool: Arc<PgPool>) -> Self {
40 let archive_status = Arc::new(tokio::sync::RwLock::new(ArchiveStatus {
41 is_archiving: false,
42 last_archived_wal: None,
43 last_archive_time: None,
44 archived_wal_count: 0,
45 failed_archives: 0,
46 archive_lag_seconds: 0,
47 }));
48
49 Self {
50 config,
51 db_pool,
52 archive_status,
53 }
54 }
55
56 pub async fn initialize(&self) -> Result<()> {
58 info!("Initializing WAL archiver");
59
60 fs::create_dir_all(&self.config.wal_archive_directory).await?;
62
63 self.verify_archiving_config().await?;
65
66 self.configure_archive_command().await?;
68
69 info!("WAL archiver initialized successfully");
70 Ok(())
71 }
72
73 pub async fn start_archiving(&self) -> Result<()> {
75 info!("Starting WAL archiving");
76
77 {
78 let mut status = self.archive_status.write().await;
79 status.is_archiving = true;
80 }
81
82 self.enable_postgresql_archiving().await?;
84
85 let archiver = self.clone();
87 tokio::spawn(async move {
88 archiver.archive_monitoring_loop().await;
89 });
90
91 info!("WAL archiving started");
92 Ok(())
93 }
94
95 pub async fn stop_archiving(&self) -> Result<()> {
97 info!("Stopping WAL archiving");
98
99 {
100 let mut status = self.archive_status.write().await;
101 status.is_archiving = false;
102 }
103
104 self.disable_postgresql_archiving().await?;
106
107 info!("WAL archiving stopped");
108 Ok(())
109 }
110
111 pub async fn archive_wal_file(&self, wal_filename: &str, wal_path: &Path) -> Result<WalFile> {
113 debug!("Archiving WAL file: {}", wal_filename);
114
115 let archive_path = self.config.wal_archive_directory.join(wal_filename);
116
117 fs::copy(wal_path, &archive_path).await?;
119
120 let checksum = self.calculate_wal_checksum(&archive_path).await?;
122
123 let metadata = fs::metadata(&archive_path).await?;
125 let file_size = metadata.len();
126
127 let wal_file = WalFile {
128 name: wal_filename.to_string(),
129 path: archive_path,
130 size_bytes: file_size,
131 created_time: Utc::now(), archived_time: Some(Utc::now()),
133 checksum,
134 };
135
136 {
138 let mut status = self.archive_status.write().await;
139 status.last_archived_wal = Some(wal_filename.to_string());
140 status.last_archive_time = Some(Utc::now());
141 status.archived_wal_count += 1;
142 }
143
144 if self.should_compress_wal() {
146 self.compress_wal_file(&wal_file.path).await?;
147 }
148
149 if self.config.enable_encryption {
151 self.encrypt_wal_file(&wal_file.path).await?;
152 }
153
154 info!(
155 "WAL file archived successfully: {} ({} bytes)",
156 wal_filename, file_size
157 );
158 Ok(wal_file)
159 }
160
161 pub async fn get_archived_wal_files(&self) -> Result<Vec<WalFile>> {
163 debug!("Getting list of archived WAL files");
164
165 let mut wal_files = Vec::new();
166 let mut entries = fs::read_dir(&self.config.wal_archive_directory).await?;
167
168 while let Some(entry) = entries.next_entry().await? {
169 let path = entry.path();
170 if path.is_file() {
171 if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
172 if self.is_wal_filename(filename) {
174 let metadata = fs::metadata(&path).await?;
175 let checksum = self.calculate_wal_checksum(&path).await?;
176
177 let wal_file = WalFile {
178 name: filename.to_string(),
179 path: path.clone(),
180 size_bytes: metadata.len(),
181 created_time: DateTime::from_timestamp(
182 metadata
183 .created()
184 .unwrap_or(std::time::UNIX_EPOCH)
185 .duration_since(std::time::UNIX_EPOCH)
186 .unwrap()
187 .as_secs() as i64,
188 0,
189 )
190 .unwrap_or_else(Utc::now),
191 archived_time: Some(
192 DateTime::from_timestamp(
193 metadata
194 .modified()
195 .unwrap_or(std::time::UNIX_EPOCH)
196 .duration_since(std::time::UNIX_EPOCH)
197 .unwrap()
198 .as_secs() as i64,
199 0,
200 )
201 .unwrap_or_else(Utc::now),
202 ),
203 checksum,
204 };
205
206 wal_files.push(wal_file);
207 }
208 }
209 }
210 }
211
212 wal_files.sort_by(|a, b| a.created_time.cmp(&b.created_time));
214
215 debug!("Found {} archived WAL files", wal_files.len());
216 Ok(wal_files)
217 }
218
219 pub async fn cleanup_old_wal_files(&self) -> Result<u32> {
221 info!("Starting WAL archive cleanup");
222
223 let wal_files = self.get_archived_wal_files().await?;
224 let retention_cutoff =
225 Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
226
227 let mut cleanup_count = 0;
228
229 for wal_file in wal_files {
230 if wal_file.archived_time.unwrap_or(Utc::now()) < retention_cutoff {
231 match fs::remove_file(&wal_file.path).await {
232 Ok(_) => {
233 cleanup_count += 1;
234 debug!("Deleted old WAL file: {}", wal_file.name);
235 }
236 Err(e) => {
237 error!("Failed to delete old WAL file {}: {}", wal_file.name, e);
238 }
239 }
240 }
241 }
242
243 info!(
244 "WAL archive cleanup completed: {} files deleted",
245 cleanup_count
246 );
247 Ok(cleanup_count)
248 }
249
250 pub async fn get_archive_status(&self) -> ArchiveStatus {
252 self.archive_status.read().await.clone()
253 }
254
255 pub async fn restore_wal_files(
257 &self,
258 target_lsn: &str,
259 restore_directory: &Path,
260 ) -> Result<Vec<WalFile>> {
261 info!("Restoring WAL files up to LSN: {}", target_lsn);
262
263 let wal_files = self.get_archived_wal_files().await?;
264 let mut restored_files = Vec::new();
265
266 fs::create_dir_all(restore_directory).await?;
268
269 for wal_file in wal_files {
270 let restore_path = restore_directory.join(&wal_file.name);
273
274 let source_path = if self.config.enable_encryption {
276 let decrypted_path = self.decrypt_wal_file(&wal_file.path).await?;
277 decrypted_path
278 } else {
279 wal_file.path.clone()
280 };
281
282 let final_source = if self.is_compressed_wal(&source_path) {
284 self.decompress_wal_file(&source_path).await?
285 } else {
286 source_path
287 };
288
289 fs::copy(&final_source, &restore_path).await?;
290
291 let mut restored_wal = wal_file.clone();
292 restored_wal.path = restore_path;
293 restored_files.push(restored_wal);
294 }
295
296 info!("Restored {} WAL files for recovery", restored_files.len());
297 Ok(restored_files)
298 }
299
300 async fn verify_archiving_config(&self) -> Result<()> {
303 debug!("Verifying PostgreSQL archiving configuration");
304
305 let wal_level: String = sqlx::query_scalar("SHOW wal_level")
307 .fetch_one(self.db_pool.as_ref())
308 .await?;
309
310 if wal_level != "replica" && wal_level != "logical" {
311 return Err(BackupError::ConfigurationError {
312 message: format!(
313 "WAL level must be 'replica' or 'logical' for archiving, found: {wal_level}"
314 ),
315 });
316 }
317
318 debug!("WAL archiving configuration verified");
319 Ok(())
320 }
321
322 async fn configure_archive_command(&self) -> Result<()> {
323 debug!("Configuring PostgreSQL archive command");
324
325 let archive_command: Option<String> = sqlx::query_scalar("SHOW archive_command")
328 .fetch_optional(self.db_pool.as_ref())
329 .await?;
330
331 match archive_command {
332 Some(cmd) if !cmd.is_empty() && cmd != "false" => {
333 info!("Archive command already configured: {}", cmd);
334 }
335 _ => {
336 warn!("Archive command not configured. WAL archiving may not work properly.");
337 }
338 }
339
340 Ok(())
341 }
342
343 async fn enable_postgresql_archiving(&self) -> Result<()> {
344 debug!("Enabling PostgreSQL archiving");
345
346 let archive_mode: String = sqlx::query_scalar("SHOW archive_mode")
348 .fetch_one(self.db_pool.as_ref())
349 .await?;
350
351 if archive_mode != "on" {
352 warn!("Archive mode is not enabled. This requires a PostgreSQL restart to change.");
353 }
355
356 Ok(())
357 }
358
359 async fn disable_postgresql_archiving(&self) -> Result<()> {
360 debug!("Disabling PostgreSQL archiving");
361 Ok(())
364 }
365
366 async fn archive_monitoring_loop(&self) {
367 info!("Starting WAL archive monitoring loop");
368
369 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
370
371 loop {
372 interval.tick().await;
373
374 let is_archiving = {
375 let status = self.archive_status.read().await;
376 status.is_archiving
377 };
378
379 if !is_archiving {
380 break;
381 }
382
383 if let Err(e) = self.check_archive_lag().await {
385 error!("Error checking archive lag: {}", e);
386 }
387 }
388
389 info!("WAL archive monitoring loop stopped");
390 }
391
392 async fn check_archive_lag(&self) -> Result<()> {
393 let _current_lsn: String = sqlx::query_scalar("SELECT pg_current_wal_lsn()")
395 .fetch_one(self.db_pool.as_ref())
396 .await?;
397
398 let last_archived_lsn: Option<String> =
399 sqlx::query_scalar("SELECT last_archived_wal FROM pg_stat_archiver")
400 .fetch_optional(self.db_pool.as_ref())
401 .await?;
402
403 let lag_seconds = if let Some(_last_lsn) = last_archived_lsn {
405 0
407 } else {
408 300 };
411
412 {
414 let mut status = self.archive_status.write().await;
415 status.archive_lag_seconds = lag_seconds;
416 }
417
418 if lag_seconds > 300 {
419 warn!("High WAL archive lag detected: {} seconds", lag_seconds);
421 }
422
423 Ok(())
424 }
425
426 async fn calculate_wal_checksum(&self, wal_path: &Path) -> Result<String> {
427 use sha2::{Digest, Sha256};
428
429 let contents = fs::read(wal_path).await?;
430 let mut hasher = Sha256::new();
431 hasher.update(&contents);
432 let result = hasher.finalize();
433 Ok(format!("{result:x}"))
434 }
435
436 fn should_compress_wal(&self) -> bool {
437 Self::should_compress_wal_static()
438 }
439
440 fn should_compress_wal_static() -> bool {
441 true
443 }
444
445 async fn compress_wal_file(&self, wal_path: &Path) -> Result<PathBuf> {
446 debug!("Compressing WAL file: {}", wal_path.display());
447
448 let compressed_path = wal_path.with_extension("gz");
449
450 let mut cmd = Command::new("gzip");
451 cmd.arg("--force").arg(wal_path);
452
453 let output = cmd.output().map_err(|e| BackupError::BackupFailed {
454 message: format!("Failed to compress WAL file: {e}"),
455 })?;
456
457 if !output.status.success() {
458 let error_msg = String::from_utf8_lossy(&output.stderr);
459 return Err(BackupError::BackupFailed {
460 message: format!("WAL compression failed: {error_msg}"),
461 });
462 }
463
464 Ok(compressed_path)
465 }
466
467 async fn encrypt_wal_file(&self, wal_path: &Path) -> Result<PathBuf> {
468 debug!("Encrypting WAL file: {}", wal_path.display());
469
470 Ok(wal_path.to_path_buf())
473 }
474
475 async fn decrypt_wal_file(&self, encrypted_path: &Path) -> Result<PathBuf> {
476 debug!("Decrypting WAL file: {}", encrypted_path.display());
477
478 Ok(encrypted_path.to_path_buf())
481 }
482
483 fn is_compressed_wal(&self, path: &Path) -> bool {
484 Self::is_compressed_wal_static(path)
485 }
486
487 fn is_compressed_wal_static(path: &Path) -> bool {
488 path.extension().and_then(|ext| ext.to_str()) == Some("gz")
489 }
490
491 async fn decompress_wal_file(&self, compressed_path: &Path) -> Result<PathBuf> {
492 debug!("Decompressing WAL file: {}", compressed_path.display());
493
494 let decompressed_path = compressed_path.with_extension("");
495
496 let mut cmd = Command::new("gunzip");
497 cmd.arg("--force").arg("--keep").arg(compressed_path);
498
499 let output = cmd.output().map_err(|e| BackupError::BackupFailed {
500 message: format!("Failed to decompress WAL file: {e}"),
501 })?;
502
503 if !output.status.success() {
504 let error_msg = String::from_utf8_lossy(&output.stderr);
505 return Err(BackupError::BackupFailed {
506 message: format!("WAL decompression failed: {error_msg}"),
507 });
508 }
509
510 Ok(decompressed_path)
511 }
512
513 fn is_wal_filename(&self, filename: &str) -> bool {
514 Self::is_wal_filename_static(filename)
515 }
516
517 fn is_wal_filename_static(filename: &str) -> bool {
518 filename.len() == 24 && filename.chars().all(|c| c.is_ascii_hexdigit())
521 || filename.contains(".partial") || filename.contains(".backup") }
524}
525
526impl Clone for WalArchiver {
527 fn clone(&self) -> Self {
528 Self {
529 config: self.config.clone(),
530 db_pool: self.db_pool.clone(),
531 archive_status: self.archive_status.clone(),
532 }
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[test]
541 fn test_wal_filename_validation() {
542 assert!(WalArchiver::is_wal_filename_static(
546 "000000010000000000000001"
547 ));
548
549 assert!(!WalArchiver::is_wal_filename_static("invalid_filename"));
551 assert!(!WalArchiver::is_wal_filename_static("short"));
552
553 assert!(WalArchiver::is_wal_filename_static(
555 "000000010000000000000001.partial"
556 ));
557 }
558
559 #[test]
560 fn test_archive_status_default() {
561 let status = ArchiveStatus {
562 is_archiving: false,
563 last_archived_wal: None,
564 last_archive_time: None,
565 archived_wal_count: 0,
566 failed_archives: 0,
567 archive_lag_seconds: 0,
568 };
569
570 assert!(!status.is_archiving);
571 assert_eq!(status.archived_wal_count, 0);
572 }
573
574 #[test]
575 fn test_should_compress_wal() {
576 assert!(WalArchiver::should_compress_wal_static());
578 }
579
580 #[test]
581 fn test_is_compressed_wal() {
582 assert!(WalArchiver::is_compressed_wal_static(Path::new("test.gz")));
584 assert!(!WalArchiver::is_compressed_wal_static(Path::new(
585 "test.txt"
586 )));
587 }
588}