1use super::{BackupError, BackupMetadata, BackupStatus, BackupType, Result};
2use async_trait::async_trait;
3use sqlx::{PgPool, Row};
4use std::sync::Arc;
5use tracing::debug;
6
7#[async_trait]
9pub trait BackupRepository: Send + Sync + std::fmt::Debug {
10 async fn initialize(&self) -> Result<()>;
11 async fn store_metadata(&self, metadata: &BackupMetadata) -> Result<()>;
12 async fn update_metadata(&self, metadata: &BackupMetadata) -> Result<()>;
13 async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>>;
14 async fn get_expired_backups(&self, retention_days: u32) -> Result<Vec<BackupMetadata>>;
15 async fn get_recent_backups(&self, days: u32) -> Result<Vec<BackupMetadata>>;
16 async fn count_backups(&self) -> Result<u32>;
17 async fn mark_backup_expired(&self, backup_id: &str) -> Result<()>;
18 async fn get_current_wal_lsn(&self) -> Result<String>;
19 async fn verify_postgres_config(&self) -> Result<()>;
20}
21
22#[derive(Debug)]
24pub struct PostgresBackupRepository {
25 db_pool: Arc<PgPool>,
26}
27
28impl PostgresBackupRepository {
29 pub fn new(db_pool: Arc<PgPool>) -> Self {
30 Self { db_pool }
31 }
32}
33
34#[async_trait]
35impl BackupRepository for PostgresBackupRepository {
36 async fn initialize(&self) -> Result<()> {
37 debug!("Initializing backup metadata store");
38
39 sqlx::query(
41 r#"
42 CREATE TABLE IF NOT EXISTS backup_metadata (
43 id VARCHAR PRIMARY KEY,
44 backup_type VARCHAR NOT NULL,
45 status VARCHAR NOT NULL,
46 start_time TIMESTAMPTZ NOT NULL,
47 end_time TIMESTAMPTZ,
48 size_bytes BIGINT DEFAULT 0,
49 compressed_size_bytes BIGINT DEFAULT 0,
50 file_path TEXT NOT NULL,
51 checksum VARCHAR,
52 database_name VARCHAR NOT NULL,
53 wal_start_lsn VARCHAR,
54 wal_end_lsn VARCHAR,
55 encryption_enabled BOOLEAN DEFAULT false,
56 metadata JSONB DEFAULT '{}',
57 created_at TIMESTAMPTZ DEFAULT NOW()
58 )
59 "#,
60 )
61 .execute(self.db_pool.as_ref())
62 .await?;
63
64 debug!("Backup metadata store initialized");
65 Ok(())
66 }
67
68 async fn store_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
69 sqlx::query(
70 r#"
71 INSERT INTO backup_metadata (
72 id, backup_type, status, start_time, end_time,
73 size_bytes, compressed_size_bytes, file_path, checksum,
74 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
75 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
76 "#,
77 )
78 .bind(&metadata.id)
79 .bind(format!("{:?}", metadata.backup_type))
80 .bind(format!("{:?}", metadata.status))
81 .bind(metadata.start_time)
82 .bind(metadata.end_time)
83 .bind(metadata.size_bytes as i64)
84 .bind(metadata.compressed_size_bytes as i64)
85 .bind(metadata.file_path.to_string_lossy().as_ref())
86 .bind(&metadata.checksum)
87 .bind(&metadata.database_name)
88 .bind(&metadata.wal_start_lsn)
89 .bind(&metadata.wal_end_lsn)
90 .bind(metadata.encryption_enabled)
91 .execute(self.db_pool.as_ref())
92 .await?;
93
94 Ok(())
95 }
96
97 async fn update_metadata(&self, metadata: &BackupMetadata) -> Result<()> {
98 sqlx::query(
99 r#"
100 UPDATE backup_metadata SET
101 status = $2, end_time = $3, size_bytes = $4,
102 compressed_size_bytes = $5, checksum = $6,
103 wal_end_lsn = $7
104 WHERE id = $1
105 "#,
106 )
107 .bind(&metadata.id)
108 .bind(format!("{:?}", metadata.status))
109 .bind(metadata.end_time)
110 .bind(metadata.size_bytes as i64)
111 .bind(metadata.compressed_size_bytes as i64)
112 .bind(&metadata.checksum)
113 .bind(&metadata.wal_end_lsn)
114 .execute(self.db_pool.as_ref())
115 .await?;
116
117 Ok(())
118 }
119
120 async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>> {
121 let row = sqlx::query(
122 r#"
123 SELECT id, backup_type, status, start_time, end_time,
124 size_bytes, compressed_size_bytes, file_path, checksum,
125 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
126 FROM backup_metadata
127 WHERE status = 'Completed'
128 ORDER BY start_time DESC
129 LIMIT 1
130 "#,
131 )
132 .fetch_optional(self.db_pool.as_ref())
133 .await?;
134
135 if let Some(row) = row {
136 Ok(Some(self.row_to_metadata(row)?))
137 } else {
138 Ok(None)
139 }
140 }
141
142 async fn get_expired_backups(&self, retention_days: u32) -> Result<Vec<BackupMetadata>> {
143 let rows = sqlx::query(
144 r#"
145 SELECT id, backup_type, status, start_time, end_time,
146 size_bytes, compressed_size_bytes, file_path, checksum,
147 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
148 FROM backup_metadata
149 WHERE start_time < NOW() - INTERVAL '%d days'
150 AND status != 'Expired'
151 "#,
152 )
153 .bind(retention_days as i32)
154 .fetch_all(self.db_pool.as_ref())
155 .await?;
156
157 let mut backups = Vec::new();
158 for row in rows {
159 backups.push(self.row_to_metadata(row)?);
160 }
161
162 Ok(backups)
163 }
164
165 async fn get_recent_backups(&self, days: u32) -> Result<Vec<BackupMetadata>> {
166 let rows = sqlx::query(
167 r#"
168 SELECT id, backup_type, status, start_time, end_time,
169 size_bytes, compressed_size_bytes, file_path, checksum,
170 database_name, wal_start_lsn, wal_end_lsn, encryption_enabled
171 FROM backup_metadata
172 WHERE start_time > NOW() - INTERVAL '%d days'
173 ORDER BY start_time DESC
174 "#,
175 )
176 .bind(days as i32)
177 .fetch_all(self.db_pool.as_ref())
178 .await?;
179
180 let mut backups = Vec::new();
181 for row in rows {
182 backups.push(self.row_to_metadata(row)?);
183 }
184
185 Ok(backups)
186 }
187
188 async fn count_backups(&self) -> Result<u32> {
189 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM backup_metadata")
190 .fetch_one(self.db_pool.as_ref())
191 .await?;
192
193 Ok(count as u32)
194 }
195
196 async fn mark_backup_expired(&self, backup_id: &str) -> Result<()> {
197 sqlx::query("UPDATE backup_metadata SET status = 'Expired' WHERE id = $1")
198 .bind(backup_id)
199 .execute(self.db_pool.as_ref())
200 .await?;
201
202 Ok(())
203 }
204
205 async fn get_current_wal_lsn(&self) -> Result<String> {
206 let lsn: String = sqlx::query_scalar("SELECT pg_current_wal_lsn()")
207 .fetch_one(self.db_pool.as_ref())
208 .await?;
209 Ok(lsn)
210 }
211
212 async fn verify_postgres_config(&self) -> Result<()> {
213 debug!("Verifying PostgreSQL configuration for backups");
214
215 let wal_level: String = sqlx::query_scalar("SHOW wal_level")
217 .fetch_one(self.db_pool.as_ref())
218 .await?;
219
220 if wal_level != "replica" && wal_level != "logical" {
221 return Err(BackupError::ConfigurationError {
222 message: format!("WAL level must be 'replica' or 'logical', found: {wal_level}"),
223 });
224 }
225
226 let archive_mode: String = sqlx::query_scalar("SHOW archive_mode")
228 .fetch_one(self.db_pool.as_ref())
229 .await?;
230
231 if archive_mode != "on" {
232 tracing::warn!("Archive mode is not enabled, continuous archiving will not work");
233 }
234
235 debug!("PostgreSQL configuration verified");
236 Ok(())
237 }
238}
239
240impl PostgresBackupRepository {
241 fn row_to_metadata(&self, row: sqlx::postgres::PgRow) -> Result<BackupMetadata> {
242 let backup_type_str: String = row.try_get("backup_type")?;
243 let backup_type = match backup_type_str.as_str() {
244 "Full" => BackupType::Full,
245 "Incremental" => BackupType::Incremental,
246 "Differential" => BackupType::Differential,
247 "WalArchive" => BackupType::WalArchive,
248 _ => BackupType::Full,
249 };
250
251 let status_str: String = row.try_get("status")?;
252 let status = match status_str.as_str() {
253 "InProgress" => BackupStatus::InProgress,
254 "Completed" => BackupStatus::Completed,
255 "Failed" => BackupStatus::Failed,
256 "Expired" => BackupStatus::Expired,
257 "Archived" => BackupStatus::Archived,
258 _ => BackupStatus::Failed,
259 };
260
261 Ok(BackupMetadata {
262 id: row.try_get("id")?,
263 backup_type,
264 status,
265 start_time: row.try_get("start_time")?,
266 end_time: row.try_get("end_time")?,
267 size_bytes: row.try_get::<i64, _>("size_bytes")? as u64,
268 compressed_size_bytes: row.try_get::<i64, _>("compressed_size_bytes")? as u64,
269 file_path: std::path::PathBuf::from(row.try_get::<String, _>("file_path")?),
270 checksum: row.try_get("checksum")?,
271 database_name: row.try_get("database_name")?,
272 wal_start_lsn: row.try_get("wal_start_lsn")?,
273 wal_end_lsn: row.try_get("wal_end_lsn")?,
274 encryption_enabled: row.try_get("encryption_enabled")?,
275 replication_status: std::collections::HashMap::new(),
276 verification_status: None,
277 })
278 }
279}
280
281#[derive(Debug)]
283pub struct MockBackupRepository;
284
285#[async_trait]
286impl BackupRepository for MockBackupRepository {
287 async fn initialize(&self) -> Result<()> {
288 Ok(())
289 }
290 async fn store_metadata(&self, _metadata: &BackupMetadata) -> Result<()> {
291 Ok(())
292 }
293 async fn update_metadata(&self, _metadata: &BackupMetadata) -> Result<()> {
294 Ok(())
295 }
296 async fn get_latest_backup(&self) -> Result<Option<BackupMetadata>> {
297 Ok(None)
298 }
299 async fn get_expired_backups(&self, _retention_days: u32) -> Result<Vec<BackupMetadata>> {
300 Ok(vec![])
301 }
302 async fn get_recent_backups(&self, _days: u32) -> Result<Vec<BackupMetadata>> {
303 Ok(vec![])
304 }
305 async fn count_backups(&self) -> Result<u32> {
306 Ok(0)
307 }
308 async fn mark_backup_expired(&self, _backup_id: &str) -> Result<()> {
309 Ok(())
310 }
311 async fn get_current_wal_lsn(&self) -> Result<String> {
312 Ok("0/0".to_string())
313 }
314 async fn verify_postgres_config(&self) -> Result<()> {
315 Ok(())
316 }
317}