Skip to main content

tuitbot_core/storage/
cleanup.rs

1//! Data retention cleanup for bounded database growth.
2//!
3//! Prunes old records according to retention rules while preserving
4//! rate limit counters and records needed for deduplication.
5
6use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9use chrono::Utc;
10
11/// Statistics from a cleanup run.
12#[derive(Debug, Clone, serde::Serialize)]
13pub struct CleanupStats {
14    /// Number of discovered tweets deleted.
15    pub discovered_tweets_deleted: u64,
16    /// Number of reply records deleted.
17    pub replies_deleted: u64,
18    /// Number of original tweet records deleted.
19    pub original_tweets_deleted: u64,
20    /// Number of thread records deleted (thread_tweets cascade).
21    pub threads_deleted: u64,
22    /// Number of action log entries deleted.
23    pub action_log_deleted: u64,
24    /// Total records deleted across all tables.
25    pub total_deleted: u64,
26    /// Whether VACUUM was run to reclaim disk space.
27    pub vacuum_run: bool,
28}
29
30/// Run data retention cleanup for a specific account, pruning old records per retention rules.
31///
32/// Retention rules:
33/// - Unreplied discovered tweets: 7 days (fixed).
34/// - Replied discovered tweets: `retention_days` (configurable, default 90).
35/// - Replies: `retention_days`.
36/// - Original tweets: `retention_days`.
37/// - Threads: `retention_days` (CASCADE deletes thread_tweets).
38/// - Action log: 14 days (fixed).
39/// - Rate limits: NEVER deleted.
40///
41/// Runs VACUUM if more than 1000 total rows were deleted.
42pub async fn run_cleanup_for(
43    pool: &DbPool,
44    account_id: &str,
45    retention_days: u32,
46) -> Result<CleanupStats, StorageError> {
47    let now = Utc::now();
48
49    let unreplied_cutoff = (now - chrono::Duration::days(7))
50        .format("%Y-%m-%dT%H:%M:%SZ")
51        .to_string();
52    let replied_cutoff = (now - chrono::Duration::days(i64::from(retention_days)))
53        .format("%Y-%m-%dT%H:%M:%SZ")
54        .to_string();
55    let action_log_cutoff = (now - chrono::Duration::days(14))
56        .format("%Y-%m-%dT%H:%M:%SZ")
57        .to_string();
58
59    // Delete child records before parent records for FK constraints.
60
61    // 1. Delete old replies first (before their parent discovered_tweets).
62    let replies_result =
63        sqlx::query("DELETE FROM replies_sent WHERE created_at < ? AND account_id = ?")
64            .bind(&replied_cutoff)
65            .bind(account_id)
66            .execute(pool)
67            .await
68            .map_err(|e| StorageError::Query { source: e })?;
69    let replies_deleted = replies_result.rows_affected();
70
71    // 2. Delete unreplied discovered tweets older than 7 days.
72    let unreplied_result = sqlx::query(
73        "DELETE FROM discovered_tweets WHERE replied_to = 0 AND discovered_at < ? AND account_id = ?",
74    )
75    .bind(&unreplied_cutoff)
76    .bind(account_id)
77    .execute(pool)
78    .await
79    .map_err(|e| StorageError::Query { source: e })?;
80
81    // 3. Delete replied discovered tweets older than retention_days.
82    let replied_result = sqlx::query(
83        "DELETE FROM discovered_tweets WHERE replied_to = 1 AND discovered_at < ? AND account_id = ?",
84    )
85    .bind(&replied_cutoff)
86    .bind(account_id)
87    .execute(pool)
88    .await
89    .map_err(|e| StorageError::Query { source: e })?;
90
91    let discovered_tweets_deleted =
92        unreplied_result.rows_affected() + replied_result.rows_affected();
93
94    // 4. Delete old original tweets.
95    let originals_result =
96        sqlx::query("DELETE FROM original_tweets WHERE created_at < ? AND account_id = ?")
97            .bind(&replied_cutoff)
98            .bind(account_id)
99            .execute(pool)
100            .await
101            .map_err(|e| StorageError::Query { source: e })?;
102    let original_tweets_deleted = originals_result.rows_affected();
103
104    // 5. Delete old threads (CASCADE deletes thread_tweets).
105    let threads_result = sqlx::query("DELETE FROM threads WHERE created_at < ? AND account_id = ?")
106        .bind(&replied_cutoff)
107        .bind(account_id)
108        .execute(pool)
109        .await
110        .map_err(|e| StorageError::Query { source: e })?;
111    let threads_deleted = threads_result.rows_affected();
112
113    // 6. Delete old action log entries.
114    let action_log_result =
115        sqlx::query("DELETE FROM action_log WHERE created_at < ? AND account_id = ?")
116            .bind(&action_log_cutoff)
117            .bind(account_id)
118            .execute(pool)
119            .await
120            .map_err(|e| StorageError::Query { source: e })?;
121    let action_log_deleted = action_log_result.rows_affected();
122
123    let total_deleted = discovered_tweets_deleted
124        + replies_deleted
125        + original_tweets_deleted
126        + threads_deleted
127        + action_log_deleted;
128
129    let vacuum_run = if total_deleted > 1000 {
130        sqlx::query("VACUUM")
131            .execute(pool)
132            .await
133            .map_err(|e| StorageError::Query { source: e })?;
134        true
135    } else {
136        false
137    };
138
139    let stats = CleanupStats {
140        discovered_tweets_deleted,
141        replies_deleted,
142        original_tweets_deleted,
143        threads_deleted,
144        action_log_deleted,
145        total_deleted,
146        vacuum_run,
147    };
148
149    tracing::info!(
150        discovered_tweets = stats.discovered_tweets_deleted,
151        replies = stats.replies_deleted,
152        original_tweets = stats.original_tweets_deleted,
153        threads = stats.threads_deleted,
154        action_log = stats.action_log_deleted,
155        total = stats.total_deleted,
156        vacuum = stats.vacuum_run,
157        "Cleanup completed"
158    );
159
160    Ok(stats)
161}
162
163/// Run data retention cleanup, pruning old records per retention rules.
164///
165/// Retention rules:
166/// - Unreplied discovered tweets: 7 days (fixed).
167/// - Replied discovered tweets: `retention_days` (configurable, default 90).
168/// - Replies: `retention_days`.
169/// - Original tweets: `retention_days`.
170/// - Threads: `retention_days` (CASCADE deletes thread_tweets).
171/// - Action log: 14 days (fixed).
172/// - Rate limits: NEVER deleted.
173///
174/// Runs VACUUM if more than 1000 total rows were deleted.
175pub async fn run_cleanup(pool: &DbPool, retention_days: u32) -> Result<CleanupStats, StorageError> {
176    run_cleanup_for(pool, DEFAULT_ACCOUNT_ID, retention_days).await
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::storage::init_test_db;
183
184    /// Insert a discovered tweet with a specific timestamp and replied_to status.
185    async fn insert_tweet_at(pool: &DbPool, id: &str, discovered_at: &str, replied_to: i64) {
186        sqlx::query(
187            "INSERT INTO discovered_tweets \
188             (id, author_id, author_username, content, discovered_at, replied_to) \
189             VALUES (?, 'u1', 'user1', 'content', ?, ?)",
190        )
191        .bind(id)
192        .bind(discovered_at)
193        .bind(replied_to)
194        .execute(pool)
195        .await
196        .expect("insert tweet");
197    }
198
199    /// Insert a reply with a specific timestamp.
200    async fn insert_reply_at(pool: &DbPool, target_id: &str, created_at: &str) {
201        sqlx::query(
202            "INSERT INTO replies_sent (target_tweet_id, reply_content, created_at) \
203             VALUES (?, 'reply text', ?)",
204        )
205        .bind(target_id)
206        .bind(created_at)
207        .execute(pool)
208        .await
209        .expect("insert reply");
210    }
211
212    /// Insert an action log entry with a specific timestamp.
213    async fn insert_action_at(pool: &DbPool, created_at: &str) {
214        sqlx::query(
215            "INSERT INTO action_log (action_type, status, created_at) \
216             VALUES ('search', 'success', ?)",
217        )
218        .bind(created_at)
219        .execute(pool)
220        .await
221        .expect("insert action");
222    }
223
224    #[tokio::test]
225    async fn cleanup_deletes_unreplied_tweets_older_than_7_days() {
226        let pool = init_test_db().await.expect("init db");
227
228        // Old unreplied tweet (10 days ago)
229        insert_tweet_at(&pool, "old_unreplied", "2020-01-01T00:00:00Z", 0).await;
230        // Recent unreplied tweet (now)
231        insert_tweet_at(
232            &pool,
233            "recent_unreplied",
234            &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
235            0,
236        )
237        .await;
238
239        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
240        assert_eq!(stats.discovered_tweets_deleted, 1);
241
242        // Verify the recent one still exists
243        let count: (i64,) =
244            sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_unreplied'")
245                .fetch_one(&pool)
246                .await
247                .expect("count");
248        assert_eq!(count.0, 1);
249    }
250
251    #[tokio::test]
252    async fn cleanup_deletes_replied_tweets_older_than_retention() {
253        let pool = init_test_db().await.expect("init db");
254
255        // Old replied tweet (100 days ago)
256        insert_tweet_at(&pool, "old_replied", "2020-01-01T00:00:00Z", 1).await;
257        // Recent replied tweet (now)
258        insert_tweet_at(
259            &pool,
260            "recent_replied",
261            &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
262            1,
263        )
264        .await;
265
266        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
267        assert_eq!(stats.discovered_tweets_deleted, 1);
268
269        let count: (i64,) =
270            sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_replied'")
271                .fetch_one(&pool)
272                .await
273                .expect("count");
274        assert_eq!(count.0, 1);
275    }
276
277    #[tokio::test]
278    async fn cleanup_deletes_old_replies() {
279        let pool = init_test_db().await.expect("init db");
280
281        insert_reply_at(&pool, "t1", "2020-01-01T00:00:00Z").await;
282        insert_reply_at(
283            &pool,
284            "t2",
285            &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
286        )
287        .await;
288
289        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
290        assert_eq!(stats.replies_deleted, 1);
291    }
292
293    #[tokio::test]
294    async fn cleanup_deletes_old_action_log_entries() {
295        let pool = init_test_db().await.expect("init db");
296
297        // 15 days ago
298        insert_action_at(&pool, "2020-01-01T00:00:00Z").await;
299        // Recent
300        insert_action_at(&pool, &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()).await;
301
302        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
303        assert_eq!(stats.action_log_deleted, 1);
304    }
305
306    #[tokio::test]
307    async fn cleanup_never_deletes_rate_limits() {
308        let pool = init_test_db().await.expect("init db");
309
310        // Insert a rate limit row
311        sqlx::query(
312            "INSERT INTO rate_limits (action_type, request_count, period_start, max_requests, period_seconds) \
313             VALUES ('reply', 5, '2020-01-01T00:00:00Z', 20, 86400)",
314        )
315        .execute(&pool)
316        .await
317        .expect("insert rate limit");
318
319        run_cleanup(&pool, 90).await.expect("cleanup");
320
321        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM rate_limits")
322            .fetch_one(&pool)
323            .await
324            .expect("count");
325        assert_eq!(count.0, 1, "rate_limits should never be deleted");
326    }
327
328    #[tokio::test]
329    async fn cleanup_empty_database_returns_zero_stats() {
330        let pool = init_test_db().await.expect("init db");
331
332        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
333        assert_eq!(stats.total_deleted, 0);
334        assert!(!stats.vacuum_run);
335    }
336
337    #[tokio::test]
338    async fn cleanup_deletes_old_threads_with_cascade() {
339        let pool = init_test_db().await.expect("init db");
340
341        // Insert an old thread
342        sqlx::query(
343            "INSERT INTO threads (topic, tweet_count, created_at, status) \
344             VALUES ('old topic', 3, '2020-01-01T00:00:00Z', 'sent')",
345        )
346        .execute(&pool)
347        .await
348        .expect("insert thread");
349
350        // Insert thread tweets (should cascade delete)
351        sqlx::query(
352            "INSERT INTO thread_tweets (thread_id, position, content, created_at) \
353             VALUES (1, 0, 'tweet 0', '2020-01-01T00:00:00Z')",
354        )
355        .execute(&pool)
356        .await
357        .expect("insert thread tweet");
358
359        let stats = run_cleanup(&pool, 90).await.expect("cleanup");
360        assert_eq!(stats.threads_deleted, 1);
361
362        // Verify thread_tweets were cascaded
363        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM thread_tweets")
364            .fetch_one(&pool)
365            .await
366            .expect("count");
367        assert_eq!(count.0, 0);
368    }
369}