Skip to main content

convergio_backup/
retention.rs

1//! Retention policy engine — auto-purge expired data.
2//!
3//! Never deletes silently: every purge is logged to backup_purge_log
4//! and emits a PurgeEvent for SSE streaming.
5
6use crate::types::{BackupResult, PurgeEvent, RetentionRule};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use tracing::{info, warn};
10
11/// Load retention rules from the database.
12/// Falls back to default rules if none are configured.
13pub fn load_rules(pool: &ConnPool) -> BackupResult<Vec<RetentionRule>> {
14    let conn = pool.get()?;
15    let mut stmt = conn.prepare(
16        "SELECT table_name, timestamp_column, max_age_days \
17         FROM backup_retention_rules WHERE org_id = '__global__'",
18    )?;
19    let rules: Vec<RetentionRule> = stmt
20        .query_map([], |row| {
21            Ok(RetentionRule {
22                table: row.get(0)?,
23                timestamp_column: row.get(1)?,
24                max_age_days: row.get(2)?,
25            })
26        })?
27        .collect::<Result<Vec<_>, _>>()?;
28    if rules.is_empty() {
29        return Ok(crate::types::default_retention_rules());
30    }
31    Ok(rules)
32}
33
34/// Load org-specific retention rules (override defaults).
35pub fn load_org_rules(pool: &ConnPool, org_id: &str) -> BackupResult<Vec<RetentionRule>> {
36    let conn = pool.get()?;
37    let mut stmt = conn.prepare(
38        "SELECT table_name, timestamp_column, max_age_days \
39         FROM backup_retention_rules WHERE org_id = ?1",
40    )?;
41    let rules: Vec<RetentionRule> = stmt
42        .query_map(params![org_id], |row| {
43            Ok(RetentionRule {
44                table: row.get(0)?,
45                timestamp_column: row.get(1)?,
46                max_age_days: row.get(2)?,
47            })
48        })?
49        .collect::<Result<Vec<_>, _>>()?;
50    Ok(rules)
51}
52
53/// Save a retention rule to the database.
54pub fn save_rule(pool: &ConnPool, rule: &RetentionRule, org_id: Option<&str>) -> BackupResult<()> {
55    // Validate identifiers
56    crate::types::validate_sql_identifier(&rule.table)?;
57    crate::types::validate_sql_identifier(&rule.timestamp_column)?;
58    if rule.max_age_days == 0 {
59        return Err(crate::types::BackupError::InvalidConfig(
60            "max_age_days must be > 0".into(),
61        ));
62    }
63
64    let conn = pool.get()?;
65    let effective_org = org_id.unwrap_or("__global__");
66    conn.execute(
67        "INSERT INTO backup_retention_rules \
68         (table_name, timestamp_column, max_age_days, org_id) \
69         VALUES (?1, ?2, ?3, ?4) \
70         ON CONFLICT(table_name, org_id) DO UPDATE SET \
71         timestamp_column = excluded.timestamp_column, \
72         max_age_days = excluded.max_age_days",
73        params![
74            rule.table,
75            rule.timestamp_column,
76            rule.max_age_days,
77            effective_org
78        ],
79    )?;
80    Ok(())
81}
82
83/// Execute purge for a single retention rule. Returns a PurgeEvent.
84pub fn purge_table(pool: &ConnPool, rule: &RetentionRule) -> BackupResult<PurgeEvent> {
85    // Validate identifiers to prevent SQL injection
86    crate::types::validate_sql_identifier(&rule.table)?;
87    crate::types::validate_sql_identifier(&rule.timestamp_column)?;
88
89    let conn = pool.get()?;
90    let cutoff = format!("-{} days", rule.max_age_days);
91
92    // Check if table exists before attempting purge
93    let exists: bool = conn
94        .query_row(
95            "SELECT COUNT(*) FROM sqlite_master \
96             WHERE type='table' AND name=?1",
97            params![rule.table],
98            |r| r.get::<_, i64>(0),
99        )
100        .map(|c| c > 0)?;
101
102    if !exists {
103        warn!(table = %rule.table, "table does not exist, skipping purge");
104        return Ok(PurgeEvent {
105            table: rule.table.clone(),
106            rows_deleted: 0,
107            cutoff_date: cutoff,
108            executed_at: chrono::Utc::now().to_rfc3339(),
109        });
110    }
111
112    // Count rows to be deleted (for logging)
113    let sql_count = format!(
114        "SELECT COUNT(*) FROM {} WHERE {} < datetime('now', ?1)",
115        rule.table, rule.timestamp_column,
116    );
117    let count: i64 = conn.query_row(&sql_count, params![cutoff], |r| r.get(0))?;
118
119    if count > 0 {
120        let sql_delete = format!(
121            "DELETE FROM {} WHERE {} < datetime('now', ?1)",
122            rule.table, rule.timestamp_column,
123        );
124        conn.execute(&sql_delete, params![cutoff])?;
125    }
126
127    let now = chrono::Utc::now().to_rfc3339();
128    // Log the purge
129    conn.execute(
130        "INSERT INTO backup_purge_log (table_name, rows_deleted, cutoff_date) \
131         VALUES (?1, ?2, ?3)",
132        params![rule.table, count, cutoff],
133    )?;
134
135    info!(table = %rule.table, rows = count, "retention purge completed");
136
137    Ok(PurgeEvent {
138        table: rule.table.clone(),
139        rows_deleted: count,
140        cutoff_date: cutoff,
141        executed_at: now,
142    })
143}
144
145/// Run auto-purge for all configured retention rules.
146pub fn run_auto_purge(pool: &ConnPool) -> BackupResult<Vec<PurgeEvent>> {
147    let rules = load_rules(pool)?;
148    let mut events = Vec::new();
149    for rule in &rules {
150        match purge_table(pool, rule) {
151            Ok(ev) => events.push(ev),
152            Err(e) => {
153                warn!(table = %rule.table, err = %e, "purge failed for table");
154            }
155        }
156    }
157    Ok(events)
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    fn setup_pool() -> ConnPool {
165        let pool = convergio_db::pool::create_memory_pool().unwrap();
166        let conn = pool.get().unwrap();
167        for m in crate::schema::migrations() {
168            conn.execute_batch(m.up).unwrap();
169        }
170        conn.execute_batch(
171            "CREATE TABLE IF NOT EXISTS audit_log (
172                id INTEGER PRIMARY KEY, msg TEXT,
173                created_at TEXT DEFAULT (datetime('now'))
174            )",
175        )
176        .unwrap();
177        drop(conn);
178        pool
179    }
180
181    #[test]
182    fn load_rules_returns_defaults_when_empty() {
183        let pool = setup_pool();
184        let rules = load_rules(&pool).unwrap();
185        assert_eq!(rules.len(), 2);
186    }
187
188    #[test]
189    fn save_and_load_custom_rule() {
190        let pool = setup_pool();
191        let rule = RetentionRule {
192            table: "audit_log".into(),
193            timestamp_column: "created_at".into(),
194            max_age_days: 90,
195        };
196        save_rule(&pool, &rule, None).unwrap();
197        let rules = load_rules(&pool).unwrap();
198        assert_eq!(rules.len(), 1);
199        assert_eq!(rules[0].max_age_days, 90);
200    }
201
202    #[test]
203    fn save_rule_rejects_zero_max_age() {
204        let pool = setup_pool();
205        let rule = RetentionRule {
206            table: "audit_log".into(),
207            timestamp_column: "created_at".into(),
208            max_age_days: 0,
209        };
210        assert!(save_rule(&pool, &rule, None).is_err());
211    }
212
213    #[test]
214    fn save_rule_rejects_sql_injection_table() {
215        let pool = setup_pool();
216        let rule = RetentionRule {
217            table: "audit_log; DROP TABLE backup_snapshots".into(),
218            timestamp_column: "created_at".into(),
219            max_age_days: 30,
220        };
221        assert!(save_rule(&pool, &rule, None).is_err());
222    }
223
224    #[test]
225    fn purge_rejects_sql_injection_column() {
226        let pool = setup_pool();
227        let rule = RetentionRule {
228            table: "audit_log".into(),
229            timestamp_column: "created_at; DROP TABLE audit_log".into(),
230            max_age_days: 365,
231        };
232        assert!(purge_table(&pool, &rule).is_err());
233    }
234
235    #[test]
236    fn purge_table_deletes_old_rows() {
237        let pool = setup_pool();
238        let conn = pool.get().unwrap();
239        // Insert old and new rows
240        conn.execute(
241            "INSERT INTO audit_log (msg, created_at) \
242             VALUES ('old', datetime('now', '-400 days'))",
243            [],
244        )
245        .unwrap();
246        conn.execute(
247            "INSERT INTO audit_log (msg, created_at) \
248             VALUES ('new', datetime('now'))",
249            [],
250        )
251        .unwrap();
252        drop(conn);
253
254        let rule = RetentionRule {
255            table: "audit_log".into(),
256            timestamp_column: "created_at".into(),
257            max_age_days: 365,
258        };
259        let event = purge_table(&pool, &rule).unwrap();
260        assert_eq!(event.rows_deleted, 1);
261
262        let conn = pool.get().unwrap();
263        let remaining: i64 = conn
264            .query_row("SELECT COUNT(*) FROM audit_log", [], |r| r.get(0))
265            .unwrap();
266        assert_eq!(remaining, 1);
267    }
268
269    #[test]
270    fn purge_nonexistent_table_returns_zero() {
271        let pool = setup_pool();
272        let rule = RetentionRule {
273            table: "nonexistent_table".into(),
274            timestamp_column: "created_at".into(),
275            max_age_days: 7,
276        };
277        let event = purge_table(&pool, &rule).unwrap();
278        assert_eq!(event.rows_deleted, 0);
279    }
280
281    #[test]
282    fn run_auto_purge_processes_all_rules() {
283        let pool = setup_pool();
284        let events = run_auto_purge(&pool).unwrap();
285        // Default rules: audit_log (exists) + ipc_messages (does not)
286        assert_eq!(events.len(), 2);
287    }
288}