1use super::DbPool;
7use crate::error::StorageError;
8use chrono::Utc;
9
10#[derive(Debug, Clone, serde::Serialize)]
12pub struct CleanupStats {
13 pub discovered_tweets_deleted: u64,
15 pub replies_deleted: u64,
17 pub original_tweets_deleted: u64,
19 pub threads_deleted: u64,
21 pub action_log_deleted: u64,
23 pub total_deleted: u64,
25 pub vacuum_run: bool,
27}
28
29pub async fn run_cleanup(pool: &DbPool, retention_days: u32) -> Result<CleanupStats, StorageError> {
42 let now = Utc::now();
43
44 let unreplied_cutoff = (now - chrono::Duration::days(7))
45 .format("%Y-%m-%dT%H:%M:%SZ")
46 .to_string();
47 let replied_cutoff = (now - chrono::Duration::days(i64::from(retention_days)))
48 .format("%Y-%m-%dT%H:%M:%SZ")
49 .to_string();
50 let action_log_cutoff = (now - chrono::Duration::days(14))
51 .format("%Y-%m-%dT%H:%M:%SZ")
52 .to_string();
53
54 let replies_result = sqlx::query("DELETE FROM replies_sent WHERE created_at < ?")
58 .bind(&replied_cutoff)
59 .execute(pool)
60 .await
61 .map_err(|e| StorageError::Query { source: e })?;
62 let replies_deleted = replies_result.rows_affected();
63
64 let unreplied_result =
66 sqlx::query("DELETE FROM discovered_tweets WHERE replied_to = 0 AND discovered_at < ?")
67 .bind(&unreplied_cutoff)
68 .execute(pool)
69 .await
70 .map_err(|e| StorageError::Query { source: e })?;
71
72 let replied_result =
74 sqlx::query("DELETE FROM discovered_tweets WHERE replied_to = 1 AND discovered_at < ?")
75 .bind(&replied_cutoff)
76 .execute(pool)
77 .await
78 .map_err(|e| StorageError::Query { source: e })?;
79
80 let discovered_tweets_deleted =
81 unreplied_result.rows_affected() + replied_result.rows_affected();
82
83 let originals_result = sqlx::query("DELETE FROM original_tweets WHERE created_at < ?")
85 .bind(&replied_cutoff)
86 .execute(pool)
87 .await
88 .map_err(|e| StorageError::Query { source: e })?;
89 let original_tweets_deleted = originals_result.rows_affected();
90
91 let threads_result = sqlx::query("DELETE FROM threads WHERE created_at < ?")
93 .bind(&replied_cutoff)
94 .execute(pool)
95 .await
96 .map_err(|e| StorageError::Query { source: e })?;
97 let threads_deleted = threads_result.rows_affected();
98
99 let action_log_result = sqlx::query("DELETE FROM action_log WHERE created_at < ?")
101 .bind(&action_log_cutoff)
102 .execute(pool)
103 .await
104 .map_err(|e| StorageError::Query { source: e })?;
105 let action_log_deleted = action_log_result.rows_affected();
106
107 let total_deleted = discovered_tweets_deleted
108 + replies_deleted
109 + original_tweets_deleted
110 + threads_deleted
111 + action_log_deleted;
112
113 let vacuum_run = if total_deleted > 1000 {
114 sqlx::query("VACUUM")
115 .execute(pool)
116 .await
117 .map_err(|e| StorageError::Query { source: e })?;
118 true
119 } else {
120 false
121 };
122
123 let stats = CleanupStats {
124 discovered_tweets_deleted,
125 replies_deleted,
126 original_tweets_deleted,
127 threads_deleted,
128 action_log_deleted,
129 total_deleted,
130 vacuum_run,
131 };
132
133 tracing::info!(
134 discovered_tweets = stats.discovered_tweets_deleted,
135 replies = stats.replies_deleted,
136 original_tweets = stats.original_tweets_deleted,
137 threads = stats.threads_deleted,
138 action_log = stats.action_log_deleted,
139 total = stats.total_deleted,
140 vacuum = stats.vacuum_run,
141 "Cleanup completed"
142 );
143
144 Ok(stats)
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::storage::init_test_db;
151
152 async fn insert_tweet_at(pool: &DbPool, id: &str, discovered_at: &str, replied_to: i64) {
154 sqlx::query(
155 "INSERT INTO discovered_tweets \
156 (id, author_id, author_username, content, discovered_at, replied_to) \
157 VALUES (?, 'u1', 'user1', 'content', ?, ?)",
158 )
159 .bind(id)
160 .bind(discovered_at)
161 .bind(replied_to)
162 .execute(pool)
163 .await
164 .expect("insert tweet");
165 }
166
167 async fn insert_reply_at(pool: &DbPool, target_id: &str, created_at: &str) {
169 sqlx::query(
170 "INSERT INTO replies_sent (target_tweet_id, reply_content, created_at) \
171 VALUES (?, 'reply text', ?)",
172 )
173 .bind(target_id)
174 .bind(created_at)
175 .execute(pool)
176 .await
177 .expect("insert reply");
178 }
179
180 async fn insert_action_at(pool: &DbPool, created_at: &str) {
182 sqlx::query(
183 "INSERT INTO action_log (action_type, status, created_at) \
184 VALUES ('search', 'success', ?)",
185 )
186 .bind(created_at)
187 .execute(pool)
188 .await
189 .expect("insert action");
190 }
191
192 #[tokio::test]
193 async fn cleanup_deletes_unreplied_tweets_older_than_7_days() {
194 let pool = init_test_db().await.expect("init db");
195
196 insert_tweet_at(&pool, "old_unreplied", "2020-01-01T00:00:00Z", 0).await;
198 insert_tweet_at(
200 &pool,
201 "recent_unreplied",
202 &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
203 0,
204 )
205 .await;
206
207 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
208 assert_eq!(stats.discovered_tweets_deleted, 1);
209
210 let count: (i64,) =
212 sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_unreplied'")
213 .fetch_one(&pool)
214 .await
215 .expect("count");
216 assert_eq!(count.0, 1);
217 }
218
219 #[tokio::test]
220 async fn cleanup_deletes_replied_tweets_older_than_retention() {
221 let pool = init_test_db().await.expect("init db");
222
223 insert_tweet_at(&pool, "old_replied", "2020-01-01T00:00:00Z", 1).await;
225 insert_tweet_at(
227 &pool,
228 "recent_replied",
229 &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
230 1,
231 )
232 .await;
233
234 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
235 assert_eq!(stats.discovered_tweets_deleted, 1);
236
237 let count: (i64,) =
238 sqlx::query_as("SELECT COUNT(*) FROM discovered_tweets WHERE id = 'recent_replied'")
239 .fetch_one(&pool)
240 .await
241 .expect("count");
242 assert_eq!(count.0, 1);
243 }
244
245 #[tokio::test]
246 async fn cleanup_deletes_old_replies() {
247 let pool = init_test_db().await.expect("init db");
248
249 insert_reply_at(&pool, "t1", "2020-01-01T00:00:00Z").await;
250 insert_reply_at(
251 &pool,
252 "t2",
253 &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
254 )
255 .await;
256
257 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
258 assert_eq!(stats.replies_deleted, 1);
259 }
260
261 #[tokio::test]
262 async fn cleanup_deletes_old_action_log_entries() {
263 let pool = init_test_db().await.expect("init db");
264
265 insert_action_at(&pool, "2020-01-01T00:00:00Z").await;
267 insert_action_at(&pool, &Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()).await;
269
270 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
271 assert_eq!(stats.action_log_deleted, 1);
272 }
273
274 #[tokio::test]
275 async fn cleanup_never_deletes_rate_limits() {
276 let pool = init_test_db().await.expect("init db");
277
278 sqlx::query(
280 "INSERT INTO rate_limits (action_type, request_count, period_start, max_requests, period_seconds) \
281 VALUES ('reply', 5, '2020-01-01T00:00:00Z', 20, 86400)",
282 )
283 .execute(&pool)
284 .await
285 .expect("insert rate limit");
286
287 run_cleanup(&pool, 90).await.expect("cleanup");
288
289 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM rate_limits")
290 .fetch_one(&pool)
291 .await
292 .expect("count");
293 assert_eq!(count.0, 1, "rate_limits should never be deleted");
294 }
295
296 #[tokio::test]
297 async fn cleanup_empty_database_returns_zero_stats() {
298 let pool = init_test_db().await.expect("init db");
299
300 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
301 assert_eq!(stats.total_deleted, 0);
302 assert!(!stats.vacuum_run);
303 }
304
305 #[tokio::test]
306 async fn cleanup_deletes_old_threads_with_cascade() {
307 let pool = init_test_db().await.expect("init db");
308
309 sqlx::query(
311 "INSERT INTO threads (topic, tweet_count, created_at, status) \
312 VALUES ('old topic', 3, '2020-01-01T00:00:00Z', 'sent')",
313 )
314 .execute(&pool)
315 .await
316 .expect("insert thread");
317
318 sqlx::query(
320 "INSERT INTO thread_tweets (thread_id, position, content, created_at) \
321 VALUES (1, 0, 'tweet 0', '2020-01-01T00:00:00Z')",
322 )
323 .execute(&pool)
324 .await
325 .expect("insert thread tweet");
326
327 let stats = run_cleanup(&pool, 90).await.expect("cleanup");
328 assert_eq!(stats.threads_deleted, 1);
329
330 let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM thread_tweets")
332 .fetch_one(&pool)
333 .await
334 .expect("count");
335 assert_eq!(count.0, 0);
336 }
337}