codex_memory/backup/
repository.rs

1use super::{BackupError, BackupMetadata, BackupStatus, BackupType, Result};
2use async_trait::async_trait;
3use sqlx::{PgPool, Row};
4use std::sync::Arc;
5use tracing::debug;
6
7/// Repository abstraction for backup metadata operations
8#[async_trait]
9pub trait BackupRepository: Send + Sync + std::fmt::Debug {
10    async fn initialize(&self) -> Result<()>;
11    async fn store_metadata(&self, metadata: &BackupMetadata) -> Result<()>;
12    async fn update_metadata(&self, metadata: &BackupMetadata) -> Result<()>;
13    async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>>;
14    async fn get_expired_backups(&self, retention_days: u32) -> Result<Vec<BackupMetadata>>;
15    async fn get_recent_backups(&self, days: u32) -> Result<Vec<BackupMetadata>>;
16    async fn count_backups(&self) -> Result<u32>;
17    async fn mark_backup_expired(&self, backup_id: &str) -> Result<()>;
18    async fn get_current_wal_lsn(&self) -> Result<String>;
19    async fn verify_postgres_config(&self) -> Result<()>;
20}
21
22/// PostgreSQL implementation of backup repository
23#[derive(Debug)]
24pub struct PostgresBackupRepository {
25    db_pool: Arc<PgPool>,
26}
27
28impl PostgresBackupRepository {
29    pub fn new(db_pool: Arc<PgPool>) -> Self {
30        Self { db_pool }
31    }
32}
33
34#[async_trait]
35impl BackupRepository for PostgresBackupRepository {
36    async fn initialize(&self) -> Result<()> {
37        debug!("Initializing backup metadata store");
38
39        // Create backup_metadata table if it doesn't exist
40        sqlx::query(
41            r#"
42            CREATE TABLE IF NOT EXISTS backup_metadata (
43                id VARCHAR PRIMARY KEY,
44                backup_type VARCHAR NOT NULL,
45                status VARCHAR NOT NULL,
46                start_time TIMESTAMPTZ NOT NULL,
47                end_time TIMESTAMPTZ,
48                size_bytes BIGINT DEFAULT 0,
49                compressed_size_bytes BIGINT DEFAULT 0,
50                file_path TEXT NOT NULL,
51                checksum VARCHAR,
52                database_name VARCHAR NOT NULL,
53                wal_start_lsn VARCHAR,
54                wal_end_lsn VARCHAR,
55                encryption_enabled BOOLEAN DEFAULT false,
56                metadata JSONB DEFAULT '{}',
57                created_at TIMESTAMPTZ DEFAULT NOW()
58            )
59        "#,
60        )
61        .execute(self.db_pool.as_ref())
62        .await?;
63
64        debug!("Backup metadata store initialized");
65        Ok(())
66    }
67
68    async fn store_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
69        sqlx::query(
70            r#"
71            INSERT INTO backup_metadata (
72                id, backup_type, status, start_time, end_time, 
73                size_bytes, compressed_size_bytes, file_path, checksum,
74                database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
75            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
76        "#,
77        )
78        .bind(&metadata.id)
79        .bind(format!("{:?}", metadata.backup_type))
80        .bind(format!("{:?}", metadata.status))
81        .bind(metadata.start_time)
82        .bind(metadata.end_time)
83        .bind(metadata.size_bytes as i64)
84        .bind(metadata.compressed_size_bytes as i64)
85        .bind(metadata.file_path.to_string_lossy().as_ref())
86        .bind(&metadata.checksum)
87        .bind(&metadata.database_name)
88        .bind(&metadata.wal_start_lsn)
89        .bind(&metadata.wal_end_lsn)
90        .bind(metadata.encryption_enabled)
91        .execute(self.db_pool.as_ref())
92        .await?;
93
94        Ok(())
95    }
96
97    async fn update_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
98        sqlx::query(
99            r#"
100            UPDATE backup_metadata SET
101                status = $2, end_time = $3, size_bytes = $4,
102                compressed_size_bytes = $5, checksum = $6,
103                wal_end_lsn = $7
104            WHERE id = $1
105        "#,
106        )
107        .bind(&metadata.id)
108        .bind(format!("{:?}", metadata.status))
109        .bind(metadata.end_time)
110        .bind(metadata.size_bytes as i64)
111        .bind(metadata.compressed_size_bytes as i64)
112        .bind(&metadata.checksum)
113        .bind(&metadata.wal_end_lsn)
114        .execute(self.db_pool.as_ref())
115        .await?;
116
117        Ok(())
118    }
119
120    async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>> {
121        let row = sqlx::query(
122            r#"
123            SELECT id, backup_type, status, start_time, end_time,
124                   size_bytes, compressed_size_bytes, file_path, checksum,
125                   database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
126            FROM backup_metadata 
127            WHERE status = 'Completed'
128            ORDER BY start_time DESC 
129            LIMIT 1
130        "#,
131        )
132        .fetch_optional(self.db_pool.as_ref())
133        .await?;
134
135        if let Some(row) = row {
136            Ok(Some(self.row_to_metadata(row)?))
137        } else {
138            Ok(None)
139        }
140    }
141
142    async fn get_expired_backups(&self, retention_days: u32) -> Result<Vec<BackupMetadata>> {
143        let rows = sqlx::query(
144            r#"
145            SELECT id, backup_type, status, start_time, end_time,
146                   size_bytes, compressed_size_bytes, file_path, checksum,
147                   database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
148            FROM backup_metadata 
149            WHERE start_time < NOW() - INTERVAL '%d days'
150            AND status != 'Expired'
151        "#,
152        )
153        .bind(retention_days as i32)
154        .fetch_all(self.db_pool.as_ref())
155        .await?;
156
157        let mut backups = Vec::new();
158        for row in rows {
159            backups.push(self.row_to_metadata(row)?);
160        }
161
162        Ok(backups)
163    }
164
165    async fn get_recent_backups(&self, days: u32) -> Result<Vec<BackupMetadata>> {
166        let rows = sqlx::query(
167            r#"
168            SELECT id, backup_type, status, start_time, end_time,
169                   size_bytes, compressed_size_bytes, file_path, checksum,
170                   database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
171            FROM backup_metadata 
172            WHERE start_time > NOW() - INTERVAL '%d days'
173            ORDER BY start_time DESC
174        "#,
175        )
176        .bind(days as i32)
177        .fetch_all(self.db_pool.as_ref())
178        .await?;
179
180        let mut backups = Vec::new();
181        for row in rows {
182            backups.push(self.row_to_metadata(row)?);
183        }
184
185        Ok(backups)
186    }
187
188    async fn count_backups(&self) -> Result<u32> {
189        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM backup_metadata")
190            .fetch_one(self.db_pool.as_ref())
191            .await?;
192
193        Ok(count as u32)
194    }
195
196    async fn mark_backup_expired(&self, backup_id: &str) -> Result<()> {
197        sqlx::query("UPDATE backup_metadata SET status = 'Expired' WHERE id = $1")
198            .bind(backup_id)
199            .execute(self.db_pool.as_ref())
200            .await?;
201
202        Ok(())
203    }
204
205    async fn get_current_wal_lsn(&self) -> Result<String> {
206        let lsn: String = sqlx::query_scalar("SELECT pg_current_wal_lsn()")
207            .fetch_one(self.db_pool.as_ref())
208            .await?;
209        Ok(lsn)
210    }
211
212    async fn verify_postgres_config(&self) -> Result<()> {
213        debug!("Verifying PostgreSQL configuration for backups");
214
215        // Check if WAL archiving is enabled
216        let wal_level: String = sqlx::query_scalar("SHOW wal_level")
217            .fetch_one(self.db_pool.as_ref())
218            .await?;
219
220        if wal_level != "replica" && wal_level != "logical" {
221            return Err(BackupError::ConfigurationError {
222                message: format!("WAL level must be 'replica' or 'logical', found: {wal_level}"),
223            });
224        }
225
226        // Check if archiving is enabled
227        let archive_mode: String = sqlx::query_scalar("SHOW archive_mode")
228            .fetch_one(self.db_pool.as_ref())
229            .await?;
230
231        if archive_mode != "on" {
232            tracing::warn!("Archive mode is not enabled, continuous archiving will not work");
233        }
234
235        debug!("PostgreSQL configuration verified");
236        Ok(())
237    }
238}
239
240impl PostgresBackupRepository {
241    fn row_to_metadata(&self, row: sqlx::postgres::PgRow) -> Result<BackupMetadata> {
242        let backup_type_str: String = row.try_get("backup_type")?;
243        let backup_type = match backup_type_str.as_str() {
244            "Full" => BackupType::Full,
245            "Incremental" => BackupType::Incremental,
246            "Differential" => BackupType::Differential,
247            "WalArchive" => BackupType::WalArchive,
248            _ => BackupType::Full,
249        };
250
251        let status_str: String = row.try_get("status")?;
252        let status = match status_str.as_str() {
253            "InProgress" => BackupStatus::InProgress,
254            "Completed" => BackupStatus::Completed,
255            "Failed" => BackupStatus::Failed,
256            "Expired" => BackupStatus::Expired,
257            "Archived" => BackupStatus::Archived,
258            _ => BackupStatus::Failed,
259        };
260
261        Ok(BackupMetadata {
262            id: row.try_get("id")?,
263            backup_type,
264            status,
265            start_time: row.try_get("start_time")?,
266            end_time: row.try_get("end_time")?,
267            size_bytes: row.try_get::<i64, _>("size_bytes")? as u64,
268            compressed_size_bytes: row.try_get::<i64, _>("compressed_size_bytes")? as u64,
269            file_path: std::path::PathBuf::from(row.try_get::<String, _>("file_path")?),
270            checksum: row.try_get("checksum")?,
271            database_name: row.try_get("database_name")?,
272            wal_start_lsn: row.try_get("wal_start_lsn")?,
273            wal_end_lsn: row.try_get("wal_end_lsn")?,
274            encryption_enabled: row.try_get("encryption_enabled")?,
275            replication_status: std::collections::HashMap::new(),
276            verification_status: None,
277        })
278    }
279}
280
281/// Mock repository for testing
282#[derive(Debug)]
283pub struct MockBackupRepository;
284
285#[async_trait]
286impl BackupRepository for MockBackupRepository {
287    async fn initialize(&self) -> Result<()> {
288        Ok(())
289    }
290    async fn store_metadata(&self, _metadata: &BackupMetadata) -> Result<()> {
291        Ok(())
292    }
293    async fn update_metadata(&self, _metadata: &BackupMetadata) -> Result<()> {
294        Ok(())
295    }
296    async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>> {
297        Ok(None)
298    }
299    async fn get_expired_backups(&self, _retention_days: u32) -> Result<Vec<BackupMetadata>> {
300        Ok(vec![])
301    }
302    async fn get_recent_backups(&self, _days: u32) -> Result<Vec<BackupMetadata>> {
303        Ok(vec![])
304    }
305    async fn count_backups(&self) -> Result<u32> {
306        Ok(0)
307    }
308    async fn mark_backup_expired(&self, _backup_id: &str) -> Result<()> {
309        Ok(())
310    }
311    async fn get_current_wal_lsn(&self) -> Result<String> {
312        Ok("0/0".to_string())
313    }
314    async fn verify_postgres_config(&self) -> Result<()> {
315        Ok(())
316    }
317}