Skip to main content

convergio_backup/
retention.rs

1//! Retention policy engine — auto-purge expired data.
2//!
3//! Never deletes silently: every purge is logged to backup_purge_log
4//! and emits a PurgeEvent for SSE streaming.
5
6use crate::types::{BackupResult, PurgeEvent, RetentionRule};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use tracing::{info, warn};
10
11/// Load retention rules from the database.
12/// Falls back to default rules if none are configured.
13pub fn load_rules(pool: &ConnPool) -> BackupResult<Vec<RetentionRule>> {
14    let conn = pool.get()?;
15    let mut stmt = conn.prepare(
16        "SELECT table_name, timestamp_column, max_age_days \
17         FROM backup_retention_rules WHERE org_id = '__global__'",
18    )?;
19    let rules: Vec<RetentionRule> = stmt
20        .query_map([], |row| {
21            Ok(RetentionRule {
22                table: row.get(0)?,
23                timestamp_column: row.get(1)?,
24                max_age_days: row.get(2)?,
25            })
26        })?
27        .filter_map(|r| r.ok())
28        .collect();
29    if rules.is_empty() {
30        return Ok(crate::types::default_retention_rules());
31    }
32    Ok(rules)
33}
34
35/// Load org-specific retention rules (override defaults).
36pub fn load_org_rules(pool: &ConnPool, org_id: &str) -> BackupResult<Vec<RetentionRule>> {
37    let conn = pool.get()?;
38    let mut stmt = conn.prepare(
39        "SELECT table_name, timestamp_column, max_age_days \
40         FROM backup_retention_rules WHERE org_id = ?1",
41    )?;
42    let rules: Vec<RetentionRule> = stmt
43        .query_map(params![org_id], |row| {
44            Ok(RetentionRule {
45                table: row.get(0)?,
46                timestamp_column: row.get(1)?,
47                max_age_days: row.get(2)?,
48            })
49        })?
50        .filter_map(|r| r.ok())
51        .collect();
52    Ok(rules)
53}
54
55/// Save a retention rule to the database.
56pub fn save_rule(pool: &ConnPool, rule: &RetentionRule, org_id: Option<&str>) -> BackupResult<()> {
57    let conn = pool.get()?;
58    let effective_org = org_id.unwrap_or("__global__");
59    conn.execute(
60        "INSERT INTO backup_retention_rules \
61         (table_name, timestamp_column, max_age_days, org_id) \
62         VALUES (?1, ?2, ?3, ?4) \
63         ON CONFLICT(table_name, org_id) DO UPDATE SET \
64         timestamp_column = excluded.timestamp_column, \
65         max_age_days = excluded.max_age_days",
66        params![
67            rule.table,
68            rule.timestamp_column,
69            rule.max_age_days,
70            effective_org
71        ],
72    )?;
73    Ok(())
74}
75
76/// Execute purge for a single retention rule. Returns a PurgeEvent.
77pub fn purge_table(pool: &ConnPool, rule: &RetentionRule) -> BackupResult<PurgeEvent> {
78    let conn = pool.get()?;
79    let cutoff = format!("-{} days", rule.max_age_days);
80
81    // Check if table exists before attempting purge
82    let exists: bool = conn
83        .query_row(
84            "SELECT COUNT(*) FROM sqlite_master \
85             WHERE type='table' AND name=?1",
86            params![rule.table],
87            |r| r.get::<_, i64>(0),
88        )
89        .map(|c| c > 0)?;
90
91    if !exists {
92        warn!(table = %rule.table, "table does not exist, skipping purge");
93        return Ok(PurgeEvent {
94            table: rule.table.clone(),
95            rows_deleted: 0,
96            cutoff_date: cutoff,
97            executed_at: chrono::Utc::now().to_rfc3339(),
98        });
99    }
100
101    // Count rows to be deleted (for logging)
102    let sql_count = format!(
103        "SELECT COUNT(*) FROM {} WHERE {} < datetime('now', ?1)",
104        rule.table, rule.timestamp_column,
105    );
106    let count: i64 = conn.query_row(&sql_count, params![cutoff], |r| r.get(0))?;
107
108    if count > 0 {
109        let sql_delete = format!(
110            "DELETE FROM {} WHERE {} < datetime('now', ?1)",
111            rule.table, rule.timestamp_column,
112        );
113        conn.execute(&sql_delete, params![cutoff])?;
114    }
115
116    let now = chrono::Utc::now().to_rfc3339();
117    // Log the purge
118    conn.execute(
119        "INSERT INTO backup_purge_log (table_name, rows_deleted, cutoff_date) \
120         VALUES (?1, ?2, ?3)",
121        params![rule.table, count, cutoff],
122    )?;
123
124    info!(table = %rule.table, rows = count, "retention purge completed");
125
126    Ok(PurgeEvent {
127        table: rule.table.clone(),
128        rows_deleted: count,
129        cutoff_date: cutoff,
130        executed_at: now,
131    })
132}
133
134/// Run auto-purge for all configured retention rules.
135pub fn run_auto_purge(pool: &ConnPool) -> BackupResult<Vec<PurgeEvent>> {
136    let rules = load_rules(pool)?;
137    let mut events = Vec::new();
138    for rule in &rules {
139        match purge_table(pool, rule) {
140            Ok(ev) => events.push(ev),
141            Err(e) => {
142                warn!(table = %rule.table, err = %e, "purge failed for table");
143            }
144        }
145    }
146    Ok(events)
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    fn setup_pool() -> ConnPool {
154        let pool = convergio_db::pool::create_memory_pool().unwrap();
155        let conn = pool.get().unwrap();
156        for m in crate::schema::migrations() {
157            conn.execute_batch(m.up).unwrap();
158        }
159        conn.execute_batch(
160            "CREATE TABLE IF NOT EXISTS audit_log (
161                id INTEGER PRIMARY KEY, msg TEXT,
162                created_at TEXT DEFAULT (datetime('now'))
163            )",
164        )
165        .unwrap();
166        drop(conn);
167        pool
168    }
169
170    #[test]
171    fn load_rules_returns_defaults_when_empty() {
172        let pool = setup_pool();
173        let rules = load_rules(&pool).unwrap();
174        assert_eq!(rules.len(), 2);
175    }
176
177    #[test]
178    fn save_and_load_custom_rule() {
179        let pool = setup_pool();
180        let rule = RetentionRule {
181            table: "audit_log".into(),
182            timestamp_column: "created_at".into(),
183            max_age_days: 90,
184        };
185        save_rule(&pool, &rule, None).unwrap();
186        let rules = load_rules(&pool).unwrap();
187        assert_eq!(rules.len(), 1);
188        assert_eq!(rules[0].max_age_days, 90);
189    }
190
191    #[test]
192    fn purge_table_deletes_old_rows() {
193        let pool = setup_pool();
194        let conn = pool.get().unwrap();
195        // Insert old and new rows
196        conn.execute(
197            "INSERT INTO audit_log (msg, created_at) \
198             VALUES ('old', datetime('now', '-400 days'))",
199            [],
200        )
201        .unwrap();
202        conn.execute(
203            "INSERT INTO audit_log (msg, created_at) \
204             VALUES ('new', datetime('now'))",
205            [],
206        )
207        .unwrap();
208        drop(conn);
209
210        let rule = RetentionRule {
211            table: "audit_log".into(),
212            timestamp_column: "created_at".into(),
213            max_age_days: 365,
214        };
215        let event = purge_table(&pool, &rule).unwrap();
216        assert_eq!(event.rows_deleted, 1);
217
218        let conn = pool.get().unwrap();
219        let remaining: i64 = conn
220            .query_row("SELECT COUNT(*) FROM audit_log", [], |r| r.get(0))
221            .unwrap();
222        assert_eq!(remaining, 1);
223    }
224
225    #[test]
226    fn purge_nonexistent_table_returns_zero() {
227        let pool = setup_pool();
228        let rule = RetentionRule {
229            table: "nonexistent_table".into(),
230            timestamp_column: "created_at".into(),
231            max_age_days: 7,
232        };
233        let event = purge_table(&pool, &rule).unwrap();
234        assert_eq!(event.rows_deleted, 0);
235    }
236
237    #[test]
238    fn run_auto_purge_processes_all_rules() {
239        let pool = setup_pool();
240        let events = run_auto_purge(&pool).unwrap();
241        // Default rules: audit_log (exists) + ipc_messages (does not)
242        assert_eq!(events.len(), 2);
243    }
244}