codex_memory/backup/
wal_archiver.rs

1use super::{BackupConfig, BackupError, Result};
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use sqlx::PgPool;
5use std::path::{Path, PathBuf};
6use std::process::Command;
7use std::sync::Arc;
8use tokio::fs;
9use tracing::{debug, error, info, warn};
10
11/// WAL (Write-Ahead Log) archiver for continuous backup capability
12pub struct WalArchiver {
13    config: BackupConfig,
14    db_pool: Arc<PgPool>,
15    archive_status: Arc<tokio::sync::RwLock<ArchiveStatus>>,
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ArchiveStatus {
20    pub is_archiving: bool,
21    pub last_archived_wal: Option<String>,
22    pub last_archive_time: Option<DateTime<Utc>>,
23    pub archived_wal_count: u64,
24    pub failed_archives: u64,
25    pub archive_lag_seconds: u64,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct WalFile {
30    pub name: String,
31    pub path: PathBuf,
32    pub size_bytes: u64,
33    pub created_time: DateTime<Utc>,
34    pub archived_time: Option<DateTime<Utc>>,
35    pub checksum: String,
36}
37
38impl WalArchiver {
39    pub fn new(config: BackupConfig, db_pool: Arc<PgPool>) -> Self {
40        let archive_status = Arc::new(tokio::sync::RwLock::new(ArchiveStatus {
41            is_archiving: false,
42            last_archived_wal: None,
43            last_archive_time: None,
44            archived_wal_count: 0,
45            failed_archives: 0,
46            archive_lag_seconds: 0,
47        }));
48
49        Self {
50            config,
51            db_pool,
52            archive_status,
53        }
54    }
55
56    /// Initialize WAL archiving
57    pub async fn initialize(&self) -> Result<()> {
58        info!("Initializing WAL archiver");
59
60        // Create WAL archive directory
61        fs::create_dir_all(&self.config.wal_archive_directory).await?;
62
63        // Verify PostgreSQL archiving configuration
64        self.verify_archiving_config().await?;
65
66        // Set up archive command if not configured
67        self.configure_archive_command().await?;
68
69        info!("WAL archiver initialized successfully");
70        Ok(())
71    }
72
73    /// Start continuous WAL archiving
74    pub async fn start_archiving(&self) -> Result<()> {
75        info!("Starting WAL archiving");
76
77        {
78            let mut status = self.archive_status.write().await;
79            status.is_archiving = true;
80        }
81
82        // Enable archiving in PostgreSQL
83        self.enable_postgresql_archiving().await?;
84
85        // Start monitoring task
86        let archiver = self.clone();
87        tokio::spawn(async move {
88            archiver.archive_monitoring_loop().await;
89        });
90
91        info!("WAL archiving started");
92        Ok(())
93    }
94
95    /// Stop WAL archiving
96    pub async fn stop_archiving(&self) -> Result<()> {
97        info!("Stopping WAL archiving");
98
99        {
100            let mut status = self.archive_status.write().await;
101            status.is_archiving = false;
102        }
103
104        // Disable archiving in PostgreSQL
105        self.disable_postgresql_archiving().await?;
106
107        info!("WAL archiving stopped");
108        Ok(())
109    }
110
111    /// Archive a specific WAL file
112    pub async fn archive_wal_file(&self, wal_filename: &str, wal_path: &Path) -> Result<WalFile> {
113        debug!("Archiving WAL file: {}", wal_filename);
114
115        let archive_path = self.config.wal_archive_directory.join(wal_filename);
116
117        // Copy WAL file to archive directory
118        fs::copy(wal_path, &archive_path).await?;
119
120        // Calculate checksum
121        let checksum = self.calculate_wal_checksum(&archive_path).await?;
122
123        // Get file metadata
124        let metadata = fs::metadata(&archive_path).await?;
125        let file_size = metadata.len();
126
127        let wal_file = WalFile {
128            name: wal_filename.to_string(),
129            path: archive_path,
130            size_bytes: file_size,
131            created_time: Utc::now(), // This would be extracted from actual WAL file in production
132            archived_time: Some(Utc::now()),
133            checksum,
134        };
135
136        // Update archive status
137        {
138            let mut status = self.archive_status.write().await;
139            status.last_archived_wal = Some(wal_filename.to_string());
140            status.last_archive_time = Some(Utc::now());
141            status.archived_wal_count += 1;
142        }
143
144        // Optionally compress the WAL file
145        if self.should_compress_wal() {
146            self.compress_wal_file(&wal_file.path).await?;
147        }
148
149        // Optionally encrypt the WAL file
150        if self.config.enable_encryption {
151            self.encrypt_wal_file(&wal_file.path).await?;
152        }
153
154        info!(
155            "WAL file archived successfully: {} ({} bytes)",
156            wal_filename, file_size
157        );
158        Ok(wal_file)
159    }
160
161    /// Get list of archived WAL files
162    pub async fn get_archived_wal_files(&self) -> Result<Vec<WalFile>> {
163        debug!("Getting list of archived WAL files");
164
165        let mut wal_files = Vec::new();
166        let mut entries = fs::read_dir(&self.config.wal_archive_directory).await?;
167
168        while let Some(entry) = entries.next_entry().await? {
169            let path = entry.path();
170            if path.is_file() {
171                if let Some(filename) = path.file_name().and_then(|n| n.to_str()) {
172                    // Check if this looks like a WAL file
173                    if self.is_wal_filename(filename) {
174                        let metadata = fs::metadata(&path).await?;
175                        let checksum = self.calculate_wal_checksum(&path).await?;
176
177                        let wal_file = WalFile {
178                            name: filename.to_string(),
179                            path: path.clone(),
180                            size_bytes: metadata.len(),
181                            created_time: DateTime::from_timestamp(
182                                metadata
183                                    .created()
184                                    .unwrap_or(std::time::UNIX_EPOCH)
185                                    .duration_since(std::time::UNIX_EPOCH)
186                                    .unwrap()
187                                    .as_secs() as i64,
188                                0,
189                            )
190                            .unwrap_or_else(Utc::now),
191                            archived_time: Some(
192                                DateTime::from_timestamp(
193                                    metadata
194                                        .modified()
195                                        .unwrap_or(std::time::UNIX_EPOCH)
196                                        .duration_since(std::time::UNIX_EPOCH)
197                                        .unwrap()
198                                        .as_secs() as i64,
199                                    0,
200                                )
201                                .unwrap_or_else(Utc::now),
202                            ),
203                            checksum,
204                        };
205
206                        wal_files.push(wal_file);
207                    }
208                }
209            }
210        }
211
212        // Sort by creation time
213        wal_files.sort_by(|a, b| a.created_time.cmp(&b.created_time));
214
215        debug!("Found {} archived WAL files", wal_files.len());
216        Ok(wal_files)
217    }
218
219    /// Clean up old WAL archives based on retention policy
220    pub async fn cleanup_old_wal_files(&self) -> Result<u32> {
221        info!("Starting WAL archive cleanup");
222
223        let wal_files = self.get_archived_wal_files().await?;
224        let retention_cutoff =
225            Utc::now() - chrono::Duration::days(self.config.retention_days as i64);
226
227        let mut cleanup_count = 0;
228
229        for wal_file in wal_files {
230            if wal_file.archived_time.unwrap_or(Utc::now()) < retention_cutoff {
231                match fs::remove_file(&wal_file.path).await {
232                    Ok(_) => {
233                        cleanup_count += 1;
234                        debug!("Deleted old WAL file: {}", wal_file.name);
235                    }
236                    Err(e) => {
237                        error!("Failed to delete old WAL file {}: {}", wal_file.name, e);
238                    }
239                }
240            }
241        }
242
243        info!(
244            "WAL archive cleanup completed: {} files deleted",
245            cleanup_count
246        );
247        Ok(cleanup_count)
248    }
249
250    /// Get current archive status
251    pub async fn get_archive_status(&self) -> ArchiveStatus {
252        self.archive_status.read().await.clone()
253    }
254
255    /// Restore WAL files for point-in-time recovery
256    pub async fn restore_wal_files(
257        &self,
258        target_lsn: &str,
259        restore_directory: &Path,
260    ) -> Result<Vec<WalFile>> {
261        info!("Restoring WAL files up to LSN: {}", target_lsn);
262
263        let wal_files = self.get_archived_wal_files().await?;
264        let mut restored_files = Vec::new();
265
266        // Create restore directory
267        fs::create_dir_all(restore_directory).await?;
268
269        for wal_file in wal_files {
270            // In a real implementation, we'd parse the LSN from WAL file names
271            // and only restore files needed for the target LSN
272            let restore_path = restore_directory.join(&wal_file.name);
273
274            // Decrypt if encrypted
275            let source_path = if self.config.enable_encryption {
276                let decrypted_path = self.decrypt_wal_file(&wal_file.path).await?;
277                decrypted_path
278            } else {
279                wal_file.path.clone()
280            };
281
282            // Decompress if compressed
283            let final_source = if self.is_compressed_wal(&source_path) {
284                self.decompress_wal_file(&source_path).await?
285            } else {
286                source_path
287            };
288
289            fs::copy(&final_source, &restore_path).await?;
290
291            let mut restored_wal = wal_file.clone();
292            restored_wal.path = restore_path;
293            restored_files.push(restored_wal);
294        }
295
296        info!("Restored {} WAL files for recovery", restored_files.len());
297        Ok(restored_files)
298    }
299
300    // Private helper methods
301
302    async fn verify_archiving_config(&self) -> Result<()> {
303        debug!("Verifying PostgreSQL archiving configuration");
304
305        // Check WAL level
306        let wal_level: String = sqlx::query_scalar("SHOW wal_level")
307            .fetch_one(self.db_pool.as_ref())
308            .await?;
309
310        if wal_level != "replica" && wal_level != "logical" {
311            return Err(BackupError::ConfigurationError {
312                message: format!(
313                    "WAL level must be 'replica' or 'logical' for archiving, found: {wal_level}"
314                ),
315            });
316        }
317
318        debug!("WAL archiving configuration verified");
319        Ok(())
320    }
321
322    async fn configure_archive_command(&self) -> Result<()> {
323        debug!("Configuring PostgreSQL archive command");
324
325        // In a production system, this would set up the archive_command
326        // For now, we'll just verify the current setting
327        let archive_command: Option<String> = sqlx::query_scalar("SHOW archive_command")
328            .fetch_optional(self.db_pool.as_ref())
329            .await?;
330
331        match archive_command {
332            Some(cmd) if !cmd.is_empty() && cmd != "false" => {
333                info!("Archive command already configured: {}", cmd);
334            }
335            _ => {
336                warn!("Archive command not configured. WAL archiving may not work properly.");
337            }
338        }
339
340        Ok(())
341    }
342
343    async fn enable_postgresql_archiving(&self) -> Result<()> {
344        debug!("Enabling PostgreSQL archiving");
345
346        // Check current archive mode
347        let archive_mode: String = sqlx::query_scalar("SHOW archive_mode")
348            .fetch_one(self.db_pool.as_ref())
349            .await?;
350
351        if archive_mode != "on" {
352            warn!("Archive mode is not enabled. This requires a PostgreSQL restart to change.");
353            // In production, this might trigger a configuration update and restart
354        }
355
356        Ok(())
357    }
358
359    async fn disable_postgresql_archiving(&self) -> Result<()> {
360        debug!("Disabling PostgreSQL archiving");
361        // This would typically involve updating PostgreSQL configuration
362        // For this implementation, we'll just log the action
363        Ok(())
364    }
365
366    async fn archive_monitoring_loop(&self) {
367        info!("Starting WAL archive monitoring loop");
368
369        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
370
371        loop {
372            interval.tick().await;
373
374            let is_archiving = {
375                let status = self.archive_status.read().await;
376                status.is_archiving
377            };
378
379            if !is_archiving {
380                break;
381            }
382
383            // Monitor archive lag
384            if let Err(e) = self.check_archive_lag().await {
385                error!("Error checking archive lag: {}", e);
386            }
387        }
388
389        info!("WAL archive monitoring loop stopped");
390    }
391
392    async fn check_archive_lag(&self) -> Result<()> {
393        // Query PostgreSQL for current WAL position and last archived position
394        let _current_lsn: String = sqlx::query_scalar("SELECT pg_current_wal_lsn()")
395            .fetch_one(self.db_pool.as_ref())
396            .await?;
397
398        let last_archived_lsn: Option<String> =
399            sqlx::query_scalar("SELECT last_archived_wal FROM pg_stat_archiver")
400                .fetch_optional(self.db_pool.as_ref())
401                .await?;
402
403        // Calculate lag (simplified - in production would parse LSN values)
404        let lag_seconds = if let Some(_last_lsn) = last_archived_lsn {
405            // This would calculate actual LSN difference
406            0
407        } else {
408            // No archived WAL yet
409            300 // 5 minutes default
410        };
411
412        // Update status
413        {
414            let mut status = self.archive_status.write().await;
415            status.archive_lag_seconds = lag_seconds;
416        }
417
418        if lag_seconds > 300 {
419            // More than 5 minutes lag
420            warn!("High WAL archive lag detected: {} seconds", lag_seconds);
421        }
422
423        Ok(())
424    }
425
426    async fn calculate_wal_checksum(&self, wal_path: &Path) -> Result<String> {
427        use sha2::{Digest, Sha256};
428
429        let contents = fs::read(wal_path).await?;
430        let mut hasher = Sha256::new();
431        hasher.update(&contents);
432        let result = hasher.finalize();
433        Ok(format!("{result:x}"))
434    }
435
436    fn should_compress_wal(&self) -> bool {
437        Self::should_compress_wal_static()
438    }
439
440    fn should_compress_wal_static() -> bool {
441        // Compress WAL files to save space
442        true
443    }
444
445    async fn compress_wal_file(&self, wal_path: &Path) -> Result<PathBuf> {
446        debug!("Compressing WAL file: {}", wal_path.display());
447
448        let compressed_path = wal_path.with_extension("gz");
449
450        let mut cmd = Command::new("gzip");
451        cmd.arg("--force").arg(wal_path);
452
453        let output = cmd.output().map_err(|e| BackupError::BackupFailed {
454            message: format!("Failed to compress WAL file: {e}"),
455        })?;
456
457        if !output.status.success() {
458            let error_msg = String::from_utf8_lossy(&output.stderr);
459            return Err(BackupError::BackupFailed {
460                message: format!("WAL compression failed: {error_msg}"),
461            });
462        }
463
464        Ok(compressed_path)
465    }
466
467    async fn encrypt_wal_file(&self, wal_path: &Path) -> Result<PathBuf> {
468        debug!("Encrypting WAL file: {}", wal_path.display());
469
470        // This would implement actual encryption using the configured key
471        // For now, just return the original path
472        Ok(wal_path.to_path_buf())
473    }
474
475    async fn decrypt_wal_file(&self, encrypted_path: &Path) -> Result<PathBuf> {
476        debug!("Decrypting WAL file: {}", encrypted_path.display());
477
478        // This would implement actual decryption
479        // For now, just return the original path
480        Ok(encrypted_path.to_path_buf())
481    }
482
483    fn is_compressed_wal(&self, path: &Path) -> bool {
484        Self::is_compressed_wal_static(path)
485    }
486
487    fn is_compressed_wal_static(path: &Path) -> bool {
488        path.extension().and_then(|ext| ext.to_str()) == Some("gz")
489    }
490
491    async fn decompress_wal_file(&self, compressed_path: &Path) -> Result<PathBuf> {
492        debug!("Decompressing WAL file: {}", compressed_path.display());
493
494        let decompressed_path = compressed_path.with_extension("");
495
496        let mut cmd = Command::new("gunzip");
497        cmd.arg("--force").arg("--keep").arg(compressed_path);
498
499        let output = cmd.output().map_err(|e| BackupError::BackupFailed {
500            message: format!("Failed to decompress WAL file: {e}"),
501        })?;
502
503        if !output.status.success() {
504            let error_msg = String::from_utf8_lossy(&output.stderr);
505            return Err(BackupError::BackupFailed {
506                message: format!("WAL decompression failed: {error_msg}"),
507            });
508        }
509
510        Ok(decompressed_path)
511    }
512
513    fn is_wal_filename(&self, filename: &str) -> bool {
514        Self::is_wal_filename_static(filename)
515    }
516
517    fn is_wal_filename_static(filename: &str) -> bool {
518        // Check if filename follows PostgreSQL WAL naming convention
519        // WAL files are typically 24 characters long hexadecimal names
520        filename.len() == 24 && filename.chars().all(|c| c.is_ascii_hexdigit())
521            || filename.contains(".partial") // Partial WAL files
522            || filename.contains(".backup") // Backup label files
523    }
524}
525
526impl Clone for WalArchiver {
527    fn clone(&self) -> Self {
528        Self {
529            config: self.config.clone(),
530            db_pool: self.db_pool.clone(),
531            archive_status: self.archive_status.clone(),
532        }
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539
540    #[test]
541    fn test_wal_filename_validation() {
542        // These tests don't need a database connection since they're just testing filename patterns
543
544        // Valid WAL filename (24 hex characters)
545        assert!(WalArchiver::is_wal_filename_static(
546            "000000010000000000000001"
547        ));
548
549        // Invalid WAL filename
550        assert!(!WalArchiver::is_wal_filename_static("invalid_filename"));
551        assert!(!WalArchiver::is_wal_filename_static("short"));
552
553        // Partial WAL files
554        assert!(WalArchiver::is_wal_filename_static(
555            "000000010000000000000001.partial"
556        ));
557    }
558
559    #[test]
560    fn test_archive_status_default() {
561        let status = ArchiveStatus {
562            is_archiving: false,
563            last_archived_wal: None,
564            last_archive_time: None,
565            archived_wal_count: 0,
566            failed_archives: 0,
567            archive_lag_seconds: 0,
568        };
569
570        assert!(!status.is_archiving);
571        assert_eq!(status.archived_wal_count, 0);
572    }
573
574    #[test]
575    fn test_should_compress_wal() {
576        // Test static method that doesn't require database connection
577        assert!(WalArchiver::should_compress_wal_static());
578    }
579
580    #[test]
581    fn test_is_compressed_wal() {
582        // Test static method that doesn't require database connection
583        assert!(WalArchiver::is_compressed_wal_static(Path::new("test.gz")));
584        assert!(!WalArchiver::is_compressed_wal_static(Path::new(
585            "test.txt"
586        )));
587    }
588}