Skip to main content

tuitbot_core/storage/
cleanup.rs

1//! Data retention cleanup for bounded database growth.
2//!
3//! Prunes old records according to retention rules while preserving
4//! rate limit counters and records needed for deduplication.
5
6use super::DbPool;
7use crate::error::StorageError;
8use chrono::Utc;
9
10/// Statistics from a cleanup run.
11#[derive(Debug, Clone, serde::Serialize)]
12pub struct CleanupStats {
13    /// Number of discovered tweets deleted.
14    pub discovered_tweets_deleted: u64,
15    /// Number of reply records deleted.
16    pub replies_deleted: u64,
17    /// Number of original tweet records deleted.
18    pub original_tweets_deleted: u64,
19    /// Number of thread records deleted (thread_tweets cascade).
20    pub threads_deleted: u64,
21    /// Number of action log entries deleted.
22    pub action_log_deleted: u64,
23    /// Total records deleted across all tables.
24    pub total_deleted: u64,
25    /// Whether VACUUM was run to reclaim disk space.
26    pub vacuum_run: bool,
27}
28
29/// Run data retention cleanup, pruning old records per retention rules.
30///
31/// Retention rules:
32/// - Unreplied discovered tweets: 7 days (fixed).
33/// - Replied discovered tweets: `retention_days` (configurable, default 90).
34/// - Replies: `retention_days`.
35/// - Original tweets: `retention_days`.
36/// - Threads: `retention_days` (CASCADE deletes thread_tweets).
37/// - Action log: 14 days (fixed).
38/// - Rate limits: NEVER deleted.
39///
40/// Runs VACUUM if more than 1000 total rows were deleted.
41pub async fn run_cleanup(pool: &DbPool, retention_days: u32) -> Result<CleanupStats, StorageError> {
42    let now = Utc::now();
43
44    let unreplied_cutoff = (now - chrono::Duration::days(7))
45        .format("%Y-%m-%dT%H:%M:%SZ")
46        .to_string();
47    let replied_cutoff = (now - chrono::Duration::days(i64::from(retention_days)))
48        .format("%Y-%m-%dT%H:%M:%SZ")
49        .to_string();
50    let action_log_cutoff = (now - chrono::Duration::days(14))
51        .format("%Y-%m-%dT%H:%M:%SZ")
52        .to_string();
53
54    // Delete child records before parent records for FK constraints.
55
56    // 1. Delete old replies first (before their parent discovered_tweets).
57    let replies_result = sqlx::query("DELETE FROM replies_sent WHERE created_at < ?")
58        .bind(&replied_cutoff)
59        .execute(pool)
60        .await
61        .map_err(|e| StorageError::Query { source: e })?;
62    let replies_deleted = replies_result.rows_affected();
63
64    // 2. Delete unreplied discovered tweets older than 7 days.
65    let unreplied_result =
66        sqlx::query("DELETE FROM discovered_tweets WHERE replied_to = 0 AND discovered_at < ?")
67            .bind(&unreplied_cutoff)
68            .execute(pool)
69            .await
70            .map_err(|e| StorageError::Query { source: e })?;
71
72    // 3. Delete replied discovered tweets older than retention_days.
73    let replied_result =
74        sqlx::query("DELETE FROM discovered_tweets WHERE replied_to = 1 AND discovered_at < ?")
75            .bind(&replied_cutoff)
76            .execute(pool)
77            .await
78            .map_err(|e| StorageError::Query { source: e })?;
79
80    let discovered_tweets_deleted =
81        unreplied_result.rows_affected() + replied_result.rows_affected();
82
83    // 4. Delete old original tweets.
84    let originals_result = sqlx::query("DELETE FROM original_tweets WHERE created_at < ?")
85        .bind(&replied_cutoff)
86        .execute(pool)
87        .await
88        .map_err(|e| StorageError::Query { source: e })?;
89    let original_tweets_deleted = originals_result.rows_affected();
90
91    // 5. Delete old threads (CASCADE deletes thread_tweets).
92    let threads_result = sqlx::query("DELETE FROM threads WHERE created_at < ?")
93        .bind(&replied_cutoff)
94        .execute(pool)
95        .await
96        .map_err(|e| StorageError::Query { source: e })?;
97    let threads_deleted = threads_result.rows_affected();
98
99    // 6. Delete old action log entries.
100    let action_log_result = sqlx::query("DELETE FROM action_log WHERE created_at < ?")
101        .bind(&action_log_cutoff)
102        .execute(pool)
103        .await
104        .map_err(|e| StorageError::Query { source: e })?;
105    let action_log_deleted = action_log_result.rows_affected();
106
107    let total_deleted = discovered_tweets_deleted
108        + replies_deleted
109        + original_tweets_deleted
110        + threads_deleted
111        + action_log_deleted;
112
113    let vacuum_run = if total_deleted > 1000 {
114        sqlx::query("VACUUM")
115            .execute(pool)
116            .await
117            .map_err(|e| StorageError::Query { source: e })?;
118        true
119    } else {
120        false
121    };
122
123    let stats = CleanupStats {
124        discovered_tweets_deleted,
125        replies_deleted,
126        original_tweets_deleted,
127        threads_deleted,
128        action_log_deleted,
129        total_deleted,
130        vacuum_run,
131    };
132
133    tracing::info!(
134        discovered_tweets = stats.discovered_tweets_deleted,
135        replies = stats.replies_deleted,
136        original_tweets = stats.original_tweets_deleted,
137        threads = stats.threads_deleted,
138        action_log = stats.action_log_deleted,
139        total = stats.total_deleted,
140        vacuum = stats.vacuum_run,
141        "Cleanup completed"
142    );
143
144    Ok(stats)
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::storage::init_test_db;
151
152    /// Insert a discovered tweet with a specific timestamp and replied_to status.
153    async fn insert_tweet_at(pool: &DbPool, id: &str, discovered_at: &str, replied_to: i64) {
154        sqlx::query(
155            "INSERT INTO discovered_tweets \
156             (id, author_id, author_username, content, discovered_at, replied_to) \
157             VALUES (?, 'u1', 'user1', 'content', ?, ?)",
158        )
159        .bind(id)
160        .bind(discovered_at)
161        .bind(replied_to)
162        .execute(pool)
163        .await
164        .expect("insert tweet");
165    }
166
167    /// Insert a reply with a specific timestamp.
168    async fn insert_reply_at(pool: &DbPool, target_id: &str, created_at: &str) {
169        sqlx::query(
170            "INSERT INTO replies_sent (target_tweet_id, reply_content, created_at) \
171             VALUES (?, 'reply text', ?)",
172        )
173        .bind(target_id)
174        .bind(created_at)
175        .execute(pool)
176        .await
177        .expect("insert reply");
178    }
179
180    /// Insert an action log entry with a specific timestamp.
181    async fn insert_action_at(pool: &DbPool, created_at: &str) {
182        sqlx::query(
183            "INSERT INTO action_log (action_type, status, created_at) \
184             VALUES ('search', 'success', ?)",
185        )
186        .bind(created_at)
187        .execute(pool)
188        .await
189        .expect("insert action");
190    }
191
192    #[tokio::test]
193    async fn cleanup_deletes_unreplied_tweets_older_than_7_days() {
194        let pool = init_test_db().await.expect("init db");
195
196        // Old unreplied tweet (10 days ago)
197        insert_tweet_at(&pool, "old_unreplied", "2020-01-01T00:00:00Z", 0).await;
198        // Recent unreplied tweet (now)
199        insert_tweet_at(
200            &pool,
201            "recent_unreplied",
202            &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
203            0,
204        )
205        .await;
206
207        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
208        assert_eq!(stats.discovered_tweets_deleted, 1);
209
210        // Verify the recent one still exists
211        let count: (i64,) =
212            sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_unreplied'")
213                .fetch_one(&pool)
214                .await
215                .expect("count");
216        assert_eq!(count.0, 1);
217    }
218
219    #[tokio::test]
220    async fn cleanup_deletes_replied_tweets_older_than_retention() {
221        let pool = init_test_db().await.expect("init db");
222
223        // Old replied tweet (100 days ago)
224        insert_tweet_at(&pool, "old_replied", "2020-01-01T00:00:00Z", 1).await;
225        // Recent replied tweet (now)
226        insert_tweet_at(
227            &pool,
228            "recent_replied",
229            &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
230            1,
231        )
232        .await;
233
234        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
235        assert_eq!(stats.discovered_tweets_deleted, 1);
236
237        let count: (i64,) =
238            sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_replied'")
239                .fetch_one(&pool)
240                .await
241                .expect("count");
242        assert_eq!(count.0, 1);
243    }
244
245    #[tokio::test]
246    async fn cleanup_deletes_old_replies() {
247        let pool = init_test_db().await.expect("init db");
248
249        insert_reply_at(&pool, "t1", "2020-01-01T00:00:00Z").await;
250        insert_reply_at(
251            &pool,
252            "t2",
253            &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
254        )
255        .await;
256
257        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
258        assert_eq!(stats.replies_deleted, 1);
259    }
260
261    #[tokio::test]
262    async fn cleanup_deletes_old_action_log_entries() {
263        let pool = init_test_db().await.expect("init db");
264
265        // 15 days ago
266        insert_action_at(&pool, "2020-01-01T00:00:00Z").await;
267        // Recent
268        insert_action_at(&pool, &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()).await;
269
270        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
271        assert_eq!(stats.action_log_deleted, 1);
272    }
273
274    #[tokio::test]
275    async fn cleanup_never_deletes_rate_limits() {
276        let pool = init_test_db().await.expect("init db");
277
278        // Insert a rate limit row
279        sqlx::query(
280            "INSERT INTO rate_limits (action_type, request_count, period_start, max_requests, period_seconds) \
281             VALUES ('reply', 5, '2020-01-01T00:00:00Z', 20, 86400)",
282        )
283        .execute(&pool)
284        .await
285        .expect("insert rate limit");
286
287        run_cleanup(&pool, 90).await.expect("cleanup");
288
289        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM rate_limits")
290            .fetch_one(&pool)
291            .await
292            .expect("count");
293        assert_eq!(count.0, 1, "rate_limits should never be deleted");
294    }
295
296    #[tokio::test]
297    async fn cleanup_empty_database_returns_zero_stats() {
298        let pool = init_test_db().await.expect("init db");
299
300        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
301        assert_eq!(stats.total_deleted, 0);
302        assert!(!stats.vacuum_run);
303    }
304
305    #[tokio::test]
306    async fn cleanup_deletes_old_threads_with_cascade() {
307        let pool = init_test_db().await.expect("init db");
308
309        // Insert an old thread
310        sqlx::query(
311            "INSERT INTO threads (topic, tweet_count, created_at, status) \
312             VALUES ('old topic', 3, '2020-01-01T00:00:00Z', 'sent')",
313        )
314        .execute(&pool)
315        .await
316        .expect("insert thread");
317
318        // Insert thread tweets (should cascade delete)
319        sqlx::query(
320            "INSERT INTO thread_tweets (thread_id, position, content, created_at) \
321             VALUES (1, 0, 'tweet 0', '2020-01-01T00:00:00Z')",
322        )
323        .execute(&pool)
324        .await
325        .expect("insert thread tweet");
326
327        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
328        assert_eq!(stats.threads_deleted, 1);
329
330        // Verify thread_tweets were cascaded
331        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM thread_tweets")
332            .fetch_one(&pool)
333            .await
334            .expect("count");
335        assert_eq!(count.0, 0);
336    }
337}