1use crate::types::{BackupResult, PurgeEvent, RetentionRule};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use tracing::{info, warn};
10
11pub fn load_rules(pool: &ConnPool) -> BackupResult<Vec<RetentionRule>> {
14 let conn = pool.get()?;
15 let mut stmt = conn.prepare(
16 "SELECT table_name, timestamp_column, max_age_days \
17 FROM backup_retention_rules WHERE org_id = '__global__'",
18 )?;
19 let rules: Vec<RetentionRule> = stmt
20 .query_map([], |row| {
21 Ok(RetentionRule {
22 table: row.get(0)?,
23 timestamp_column: row.get(1)?,
24 max_age_days: row.get(2)?,
25 })
26 })?
27 .collect::<Result<Vec<_>, _>>()?;
28 if rules.is_empty() {
29 return Ok(crate::types::default_retention_rules());
30 }
31 Ok(rules)
32}
33
34pub fn load_org_rules(pool: &ConnPool, org_id: &str) -> BackupResult<Vec<RetentionRule>> {
36 let conn = pool.get()?;
37 let mut stmt = conn.prepare(
38 "SELECT table_name, timestamp_column, max_age_days \
39 FROM backup_retention_rules WHERE org_id = ?1",
40 )?;
41 let rules: Vec<RetentionRule> = stmt
42 .query_map(params![org_id], |row| {
43 Ok(RetentionRule {
44 table: row.get(0)?,
45 timestamp_column: row.get(1)?,
46 max_age_days: row.get(2)?,
47 })
48 })?
49 .collect::<Result<Vec<_>, _>>()?;
50 Ok(rules)
51}
52
53pub fn save_rule(pool: &ConnPool, rule: &RetentionRule, org_id: Option<&str>) -> BackupResult<()> {
55 crate::types::validate_sql_identifier(&rule.table)?;
57 crate::types::validate_sql_identifier(&rule.timestamp_column)?;
58 if rule.max_age_days == 0 {
59 return Err(crate::types::BackupError::InvalidConfig(
60 "max_age_days must be > 0".into(),
61 ));
62 }
63
64 let conn = pool.get()?;
65 let effective_org = org_id.unwrap_or("__global__");
66 conn.execute(
67 "INSERT INTO backup_retention_rules \
68 (table_name, timestamp_column, max_age_days, org_id) \
69 VALUES (?1, ?2, ?3, ?4) \
70 ON CONFLICT(table_name, org_id) DO UPDATE SET \
71 timestamp_column = excluded.timestamp_column, \
72 max_age_days = excluded.max_age_days",
73 params![
74 rule.table,
75 rule.timestamp_column,
76 rule.max_age_days,
77 effective_org
78 ],
79 )?;
80 Ok(())
81}
82
83pub fn purge_table(pool: &ConnPool, rule: &RetentionRule) -> BackupResult<PurgeEvent> {
85 crate::types::validate_sql_identifier(&rule.table)?;
87 crate::types::validate_sql_identifier(&rule.timestamp_column)?;
88
89 let conn = pool.get()?;
90 let cutoff = format!("-{} days", rule.max_age_days);
91
92 let exists: bool = conn
94 .query_row(
95 "SELECT COUNT(*) FROM sqlite_master \
96 WHERE type='table' AND name=?1",
97 params![rule.table],
98 |r| r.get::<_, i64>(0),
99 )
100 .map(|c| c > 0)?;
101
102 if !exists {
103 warn!(table = %rule.table, "table does not exist, skipping purge");
104 return Ok(PurgeEvent {
105 table: rule.table.clone(),
106 rows_deleted: 0,
107 cutoff_date: cutoff,
108 executed_at: chrono::Utc::now().to_rfc3339(),
109 });
110 }
111
112 let sql_count = format!(
114 "SELECT COUNT(*) FROM {} WHERE {} < datetime('now', ?1)",
115 rule.table, rule.timestamp_column,
116 );
117 let count: i64 = conn.query_row(&sql_count, params![cutoff], |r| r.get(0))?;
118
119 if count > 0 {
120 let sql_delete = format!(
121 "DELETE FROM {} WHERE {} < datetime('now', ?1)",
122 rule.table, rule.timestamp_column,
123 );
124 conn.execute(&sql_delete, params![cutoff])?;
125 }
126
127 let now = chrono::Utc::now().to_rfc3339();
128 conn.execute(
130 "INSERT INTO backup_purge_log (table_name, rows_deleted, cutoff_date) \
131 VALUES (?1, ?2, ?3)",
132 params![rule.table, count, cutoff],
133 )?;
134
135 info!(table = %rule.table, rows = count, "retention purge completed");
136
137 Ok(PurgeEvent {
138 table: rule.table.clone(),
139 rows_deleted: count,
140 cutoff_date: cutoff,
141 executed_at: now,
142 })
143}
144
145pub fn run_auto_purge(pool: &ConnPool) -> BackupResult<Vec<PurgeEvent>> {
147 let rules = load_rules(pool)?;
148 let mut events = Vec::new();
149 for rule in &rules {
150 match purge_table(pool, rule) {
151 Ok(ev) => events.push(ev),
152 Err(e) => {
153 warn!(table = %rule.table, err = %e, "purge failed for table");
154 }
155 }
156 }
157 Ok(events)
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 fn setup_pool() -> ConnPool {
165 let pool = convergio_db::pool::create_memory_pool().unwrap();
166 let conn = pool.get().unwrap();
167 for m in crate::schema::migrations() {
168 conn.execute_batch(m.up).unwrap();
169 }
170 conn.execute_batch(
171 "CREATE TABLE IF NOT EXISTS audit_log (
172 id INTEGER PRIMARY KEY, msg TEXT,
173 created_at TEXT DEFAULT (datetime('now'))
174 )",
175 )
176 .unwrap();
177 drop(conn);
178 pool
179 }
180
181 #[test]
182 fn load_rules_returns_defaults_when_empty() {
183 let pool = setup_pool();
184 let rules = load_rules(&pool).unwrap();
185 assert_eq!(rules.len(), 2);
186 }
187
188 #[test]
189 fn save_and_load_custom_rule() {
190 let pool = setup_pool();
191 let rule = RetentionRule {
192 table: "audit_log".into(),
193 timestamp_column: "created_at".into(),
194 max_age_days: 90,
195 };
196 save_rule(&pool, &rule, None).unwrap();
197 let rules = load_rules(&pool).unwrap();
198 assert_eq!(rules.len(), 1);
199 assert_eq!(rules[0].max_age_days, 90);
200 }
201
202 #[test]
203 fn save_rule_rejects_zero_max_age() {
204 let pool = setup_pool();
205 let rule = RetentionRule {
206 table: "audit_log".into(),
207 timestamp_column: "created_at".into(),
208 max_age_days: 0,
209 };
210 assert!(save_rule(&pool, &rule, None).is_err());
211 }
212
213 #[test]
214 fn save_rule_rejects_sql_injection_table() {
215 let pool = setup_pool();
216 let rule = RetentionRule {
217 table: "audit_log; DROP TABLE backup_snapshots".into(),
218 timestamp_column: "created_at".into(),
219 max_age_days: 30,
220 };
221 assert!(save_rule(&pool, &rule, None).is_err());
222 }
223
224 #[test]
225 fn purge_rejects_sql_injection_column() {
226 let pool = setup_pool();
227 let rule = RetentionRule {
228 table: "audit_log".into(),
229 timestamp_column: "created_at; DROP TABLE audit_log".into(),
230 max_age_days: 365,
231 };
232 assert!(purge_table(&pool, &rule).is_err());
233 }
234
235 #[test]
236 fn purge_table_deletes_old_rows() {
237 let pool = setup_pool();
238 let conn = pool.get().unwrap();
239 conn.execute(
241 "INSERT INTO audit_log (msg, created_at) \
242 VALUES ('old', datetime('now', '-400 days'))",
243 [],
244 )
245 .unwrap();
246 conn.execute(
247 "INSERT INTO audit_log (msg, created_at) \
248 VALUES ('new', datetime('now'))",
249 [],
250 )
251 .unwrap();
252 drop(conn);
253
254 let rule = RetentionRule {
255 table: "audit_log".into(),
256 timestamp_column: "created_at".into(),
257 max_age_days: 365,
258 };
259 let event = purge_table(&pool, &rule).unwrap();
260 assert_eq!(event.rows_deleted, 1);
261
262 let conn = pool.get().unwrap();
263 let remaining: i64 = conn
264 .query_row("SELECT COUNT(*) FROM audit_log", [], |r| r.get(0))
265 .unwrap();
266 assert_eq!(remaining, 1);
267 }
268
269 #[test]
270 fn purge_nonexistent_table_returns_zero() {
271 let pool = setup_pool();
272 let rule = RetentionRule {
273 table: "nonexistent_table".into(),
274 timestamp_column: "created_at".into(),
275 max_age_days: 7,
276 };
277 let event = purge_table(&pool, &rule).unwrap();
278 assert_eq!(event.rows_deleted, 0);
279 }
280
281 #[test]
282 fn run_auto_purge_processes_all_rules() {
283 let pool = setup_pool();
284 let events = run_auto_purge(&pool).unwrap();
285 assert_eq!(events.len(), 2);
287 }
288}