1use crate::types::{BackupResult, PurgeEvent, RetentionRule};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use tracing::{info, warn};
10
11pub fn load_rules(pool: &ConnPool) -> BackupResult<Vec<RetentionRule>> {
14 let conn = pool.get()?;
15 let mut stmt = conn.prepare(
16 "SELECT table_name, timestamp_column, max_age_days \
17 FROM backup_retention_rules WHERE org_id = '__global__'",
18 )?;
19 let rules: Vec<RetentionRule> = stmt
20 .query_map([], |row| {
21 Ok(RetentionRule {
22 table: row.get(0)?,
23 timestamp_column: row.get(1)?,
24 max_age_days: row.get(2)?,
25 })
26 })?
27 .filter_map(|r| r.ok())
28 .collect();
29 if rules.is_empty() {
30 return Ok(crate::types::default_retention_rules());
31 }
32 Ok(rules)
33}
34
35pub fn load_org_rules(pool: &ConnPool, org_id: &str) -> BackupResult<Vec<RetentionRule>> {
37 let conn = pool.get()?;
38 let mut stmt = conn.prepare(
39 "SELECT table_name, timestamp_column, max_age_days \
40 FROM backup_retention_rules WHERE org_id = ?1",
41 )?;
42 let rules: Vec<RetentionRule> = stmt
43 .query_map(params![org_id], |row| {
44 Ok(RetentionRule {
45 table: row.get(0)?,
46 timestamp_column: row.get(1)?,
47 max_age_days: row.get(2)?,
48 })
49 })?
50 .filter_map(|r| r.ok())
51 .collect();
52 Ok(rules)
53}
54
55pub fn save_rule(pool: &ConnPool, rule: &RetentionRule, org_id: Option<&str>) -> BackupResult<()> {
57 let conn = pool.get()?;
58 let effective_org = org_id.unwrap_or("__global__");
59 conn.execute(
60 "INSERT INTO backup_retention_rules \
61 (table_name, timestamp_column, max_age_days, org_id) \
62 VALUES (?1, ?2, ?3, ?4) \
63 ON CONFLICT(table_name, org_id) DO UPDATE SET \
64 timestamp_column = excluded.timestamp_column, \
65 max_age_days = excluded.max_age_days",
66 params![
67 rule.table,
68 rule.timestamp_column,
69 rule.max_age_days,
70 effective_org
71 ],
72 )?;
73 Ok(())
74}
75
76pub fn purge_table(pool: &ConnPool, rule: &RetentionRule) -> BackupResult<PurgeEvent> {
78 let conn = pool.get()?;
79 let cutoff = format!("-{} days", rule.max_age_days);
80
81 let exists: bool = conn
83 .query_row(
84 "SELECT COUNT(*) FROM sqlite_master \
85 WHERE type='table' AND name=?1",
86 params![rule.table],
87 |r| r.get::<_, i64>(0),
88 )
89 .map(|c| c > 0)?;
90
91 if !exists {
92 warn!(table = %rule.table, "table does not exist, skipping purge");
93 return Ok(PurgeEvent {
94 table: rule.table.clone(),
95 rows_deleted: 0,
96 cutoff_date: cutoff,
97 executed_at: chrono::Utc::now().to_rfc3339(),
98 });
99 }
100
101 let sql_count = format!(
103 "SELECT COUNT(*) FROM {} WHERE {} < datetime('now', ?1)",
104 rule.table, rule.timestamp_column,
105 );
106 let count: i64 = conn.query_row(&sql_count, params![cutoff], |r| r.get(0))?;
107
108 if count > 0 {
109 let sql_delete = format!(
110 "DELETE FROM {} WHERE {} < datetime('now', ?1)",
111 rule.table, rule.timestamp_column,
112 );
113 conn.execute(&sql_delete, params![cutoff])?;
114 }
115
116 let now = chrono::Utc::now().to_rfc3339();
117 conn.execute(
119 "INSERT INTO backup_purge_log (table_name, rows_deleted, cutoff_date) \
120 VALUES (?1, ?2, ?3)",
121 params![rule.table, count, cutoff],
122 )?;
123
124 info!(table = %rule.table, rows = count, "retention purge completed");
125
126 Ok(PurgeEvent {
127 table: rule.table.clone(),
128 rows_deleted: count,
129 cutoff_date: cutoff,
130 executed_at: now,
131 })
132}
133
134pub fn run_auto_purge(pool: &ConnPool) -> BackupResult<Vec<PurgeEvent>> {
136 let rules = load_rules(pool)?;
137 let mut events = Vec::new();
138 for rule in &rules {
139 match purge_table(pool, rule) {
140 Ok(ev) => events.push(ev),
141 Err(e) => {
142 warn!(table = %rule.table, err = %e, "purge failed for table");
143 }
144 }
145 }
146 Ok(events)
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 fn setup_pool() -> ConnPool {
154 let pool = convergio_db::pool::create_memory_pool().unwrap();
155 let conn = pool.get().unwrap();
156 for m in crate::schema::migrations() {
157 conn.execute_batch(m.up).unwrap();
158 }
159 conn.execute_batch(
160 "CREATE TABLE IF NOT EXISTS audit_log (
161 id INTEGER PRIMARY KEY, msg TEXT,
162 created_at TEXT DEFAULT (datetime('now'))
163 )",
164 )
165 .unwrap();
166 drop(conn);
167 pool
168 }
169
170 #[test]
171 fn load_rules_returns_defaults_when_empty() {
172 let pool = setup_pool();
173 let rules = load_rules(&pool).unwrap();
174 assert_eq!(rules.len(), 2);
175 }
176
177 #[test]
178 fn save_and_load_custom_rule() {
179 let pool = setup_pool();
180 let rule = RetentionRule {
181 table: "audit_log".into(),
182 timestamp_column: "created_at".into(),
183 max_age_days: 90,
184 };
185 save_rule(&pool, &rule, None).unwrap();
186 let rules = load_rules(&pool).unwrap();
187 assert_eq!(rules.len(), 1);
188 assert_eq!(rules[0].max_age_days, 90);
189 }
190
191 #[test]
192 fn purge_table_deletes_old_rows() {
193 let pool = setup_pool();
194 let conn = pool.get().unwrap();
195 conn.execute(
197 "INSERT INTO audit_log (msg, created_at) \
198 VALUES ('old', datetime('now', '-400 days'))",
199 [],
200 )
201 .unwrap();
202 conn.execute(
203 "INSERT INTO audit_log (msg, created_at) \
204 VALUES ('new', datetime('now'))",
205 [],
206 )
207 .unwrap();
208 drop(conn);
209
210 let rule = RetentionRule {
211 table: "audit_log".into(),
212 timestamp_column: "created_at".into(),
213 max_age_days: 365,
214 };
215 let event = purge_table(&pool, &rule).unwrap();
216 assert_eq!(event.rows_deleted, 1);
217
218 let conn = pool.get().unwrap();
219 let remaining: i64 = conn
220 .query_row("SELECT COUNT(*) FROM audit_log", [], |r| r.get(0))
221 .unwrap();
222 assert_eq!(remaining, 1);
223 }
224
225 #[test]
226 fn purge_nonexistent_table_returns_zero() {
227 let pool = setup_pool();
228 let rule = RetentionRule {
229 table: "nonexistent_table".into(),
230 timestamp_column: "created_at".into(),
231 max_age_days: 7,
232 };
233 let event = purge_table(&pool, &rule).unwrap();
234 assert_eq!(event.rows_deleted, 0);
235 }
236
237 #[test]
238 fn run_auto_purge_processes_all_rules() {
239 let pool = setup_pool();
240 let events = run_auto_purge(&pool).unwrap();
241 assert_eq!(events.len(), 2);
243 }
244}