1use super::accounts::DEFAULT_ACCOUNT_ID;
7use super::DbPool;
8use crate::error::StorageError;
9use chrono::Utc;
10
11#[derive(Debug, Clone, serde::Serialize)]
13pub struct CleanupStats {
14 pub discovered_tweets_deleted: u64,
16 pub replies_deleted: u64,
18 pub original_tweets_deleted: u64,
20 pub threads_deleted: u64,
22 pub action_log_deleted: u64,
24 pub total_deleted: u64,
26 pub vacuum_run: bool,
28}
29
30pub async fn run_cleanup_for(
43 pool: &DbPool,
44 account_id: &str,
45 retention_days: u32,
46) -> Result<CleanupStats, StorageError> {
47 let now = Utc::now();
48
49 let unreplied_cutoff = (now - chrono::Duration::days(7))
50 .format("%Y-%m-%dT%H:%M:%SZ")
51 .to_string();
52 let replied_cutoff = (now - chrono::Duration::days(i64::from(retention_days)))
53 .format("%Y-%m-%dT%H:%M:%SZ")
54 .to_string();
55 let action_log_cutoff = (now - chrono::Duration::days(14))
56 .format("%Y-%m-%dT%H:%M:%SZ")
57 .to_string();
58
59 let replies_result =
63 sqlx::query("DELETE FROM replies_sent WHERE created_at < ? AND account_id = ?")
64 .bind(&replied_cutoff)
65 .bind(account_id)
66 .execute(pool)
67 .await
68 .map_err(|e| StorageError::Query { source: e })?;
69 let replies_deleted = replies_result.rows_affected();
70
71 let unreplied_result = sqlx::query(
73 "DELETE FROM discovered_tweets WHERE replied_to = 0 AND discovered_at < ? AND account_id = ?",
74 )
75 .bind(&unreplied_cutoff)
76 .bind(account_id)
77 .execute(pool)
78 .await
79 .map_err(|e| StorageError::Query { source: e })?;
80
81 let replied_result = sqlx::query(
83 "DELETE FROM discovered_tweets WHERE replied_to = 1 AND discovered_at < ? AND account_id = ?",
84 )
85 .bind(&replied_cutoff)
86 .bind(account_id)
87 .execute(pool)
88 .await
89 .map_err(|e| StorageError::Query { source: e })?;
90
91 let discovered_tweets_deleted =
92 unreplied_result.rows_affected() + replied_result.rows_affected();
93
94 let originals_result =
96 sqlx::query("DELETE FROM original_tweets WHERE created_at < ? AND account_id = ?")
97 .bind(&replied_cutoff)
98 .bind(account_id)
99 .execute(pool)
100 .await
101 .map_err(|e| StorageError::Query { source: e })?;
102 let original_tweets_deleted = originals_result.rows_affected();
103
104 let threads_result = sqlx::query("DELETE FROM threads WHERE created_at < ? AND account_id = ?")
106 .bind(&replied_cutoff)
107 .bind(account_id)
108 .execute(pool)
109 .await
110 .map_err(|e| StorageError::Query { source: e })?;
111 let threads_deleted = threads_result.rows_affected();
112
113 let action_log_result =
115 sqlx::query("DELETE FROM action_log WHERE created_at < ? AND account_id = ?")
116 .bind(&action_log_cutoff)
117 .bind(account_id)
118 .execute(pool)
119 .await
120 .map_err(|e| StorageError::Query { source: e })?;
121 let action_log_deleted = action_log_result.rows_affected();
122
123 let total_deleted = discovered_tweets_deleted
124 + replies_deleted
125 + original_tweets_deleted
126 + threads_deleted
127 + action_log_deleted;
128
129 let vacuum_run = if total_deleted > 1000 {
130 sqlx::query("VACUUM")
131 .execute(pool)
132 .await
133 .map_err(|e| StorageError::Query { source: e })?;
134 true
135 } else {
136 false
137 };
138
139 let stats = CleanupStats {
140 discovered_tweets_deleted,
141 replies_deleted,
142 original_tweets_deleted,
143 threads_deleted,
144 action_log_deleted,
145 total_deleted,
146 vacuum_run,
147 };
148
149 tracing::info!(
150 discovered_tweets = stats.discovered_tweets_deleted,
151 replies = stats.replies_deleted,
152 original_tweets = stats.original_tweets_deleted,
153 threads = stats.threads_deleted,
154 action_log = stats.action_log_deleted,
155 total = stats.total_deleted,
156 vacuum = stats.vacuum_run,
157 "Cleanup completed"
158 );
159
160 Ok(stats)
161}
162
163pub async fn run_cleanup(pool: &DbPool, retention_days: u32) -> Result<CleanupStats, StorageError> {
176 run_cleanup_for(pool, DEFAULT_ACCOUNT_ID, retention_days).await
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::storage::init_test_db;
183
184 async fn insert_tweet_at(pool: &DbPool, id: &str, discovered_at: &str, replied_to: i64) {
186 sqlx::query(
187 "INSERT INTO discovered_tweets \
188 (id, author_id, author_username, content, discovered_at, replied_to) \
189 VALUES (?, 'u1', 'user1', 'content', ?, ?)",
190 )
191 .bind(id)
192 .bind(discovered_at)
193 .bind(replied_to)
194 .execute(pool)
195 .await
196 .expect("insert tweet");
197 }
198
199 async fn insert_reply_at(pool: &DbPool, target_id: &str, created_at: &str) {
201 sqlx::query(
202 "INSERT INTO replies_sent (target_tweet_id, reply_content, created_at) \
203 VALUES (?, 'reply text', ?)",
204 )
205 .bind(target_id)
206 .bind(created_at)
207 .execute(pool)
208 .await
209 .expect("insert reply");
210 }
211
212 async fn insert_action_at(pool: &DbPool, created_at: &str) {
214 sqlx::query(
215 "INSERT INTO action_log (action_type, status, created_at) \
216 VALUES ('search', 'success', ?)",
217 )
218 .bind(created_at)
219 .execute(pool)
220 .await
221 .expect("insert action");
222 }
223
224 #[tokio::test]
225 async fn cleanup_deletes_unreplied_tweets_older_than_7_days() {
226 let pool = init_test_db().await.expect("init db");
227
228 insert_tweet_at(&pool, "old_unreplied", "2020-01-01T00:00:00Z", 0).await;
230 insert_tweet_at(
232 &pool,
233 "recent_unreplied",
234 &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
235 0,
236 )
237 .await;
238
239 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
240 assert_eq!(stats.discovered_tweets_deleted, 1);
241
242 let count: (i64,) =
244 sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_unreplied'")
245 .fetch_one(&pool)
246 .await
247 .expect("count");
248 assert_eq!(count.0, 1);
249 }
250
251 #[tokio::test]
252 async fn cleanup_deletes_replied_tweets_older_than_retention() {
253 let pool = init_test_db().await.expect("init db");
254
255 insert_tweet_at(&pool, "old_replied", "2020-01-01T00:00:00Z", 1).await;
257 insert_tweet_at(
259 &pool,
260 "recent_replied",
261 &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
262 1,
263 )
264 .await;
265
266 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
267 assert_eq!(stats.discovered_tweets_deleted, 1);
268
269 let count: (i64,) =
270 sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_replied'")
271 .fetch_one(&pool)
272 .await
273 .expect("count");
274 assert_eq!(count.0, 1);
275 }
276
277 #[tokio::test]
278 async fn cleanup_deletes_old_replies() {
279 let pool = init_test_db().await.expect("init db");
280
281 insert_reply_at(&pool, "t1", "2020-01-01T00:00:00Z").await;
282 insert_reply_at(
283 &pool,
284 "t2",
285 &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
286 )
287 .await;
288
289 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
290 assert_eq!(stats.replies_deleted, 1);
291 }
292
293 #[tokio::test]
294 async fn cleanup_deletes_old_action_log_entries() {
295 let pool = init_test_db().await.expect("init db");
296
297 insert_action_at(&pool, "2020-01-01T00:00:00Z").await;
299 insert_action_at(&pool, &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()).await;
301
302 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
303 assert_eq!(stats.action_log_deleted, 1);
304 }
305
306 #[tokio::test]
307 async fn cleanup_never_deletes_rate_limits() {
308 let pool = init_test_db().await.expect("init db");
309
310 sqlx::query(
312 "INSERT INTO rate_limits (action_type, request_count, period_start, max_requests, period_seconds) \
313 VALUES ('reply', 5, '2020-01-01T00:00:00Z', 20, 86400)",
314 )
315 .execute(&pool)
316 .await
317 .expect("insert rate limit");
318
319 run_cleanup(&pool, 90).await.expect("cleanup");
320
321 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM rate_limits")
322 .fetch_one(&pool)
323 .await
324 .expect("count");
325 assert_eq!(count.0, 1, "rate_limits should never be deleted");
326 }
327
328 #[tokio::test]
329 async fn cleanup_empty_database_returns_zero_stats() {
330 let pool = init_test_db().await.expect("init db");
331
332 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
333 assert_eq!(stats.total_deleted, 0);
334 assert!(!stats.vacuum_run);
335 }
336
337 #[tokio::test]
338 async fn cleanup_deletes_old_threads_with_cascade() {
339 let pool = init_test_db().await.expect("init db");
340
341 sqlx::query(
343 "INSERT INTO threads (topic, tweet_count, created_at, status) \
344 VALUES ('old topic', 3, '2020-01-01T00:00:00Z', 'sent')",
345 )
346 .execute(&pool)
347 .await
348 .expect("insert thread");
349
350 sqlx::query(
352 "INSERT INTO thread_tweets (thread_id, position, content, created_at) \
353 VALUES (1, 0, 'tweet 0', '2020-01-01T00:00:00Z')",
354 )
355 .execute(&pool)
356 .await
357 .expect("insert thread tweet");
358
359 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
360 assert_eq!(stats.threads_deleted, 1);
361
362 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM thread_tweets")
364 .fetch_one(&pool)
365 .await
366 .expect("count");
367 assert_eq!(count.0, 0);
368 }
369}