1use chrono::{DateTime, Utc};
4use tokio::sync::mpsc;
5
6use super::super::analytics_loop::{AnalyticsError, AnalyticsStorage};
7use super::super::loop_helpers::{
8 ContentLoopError, ContentStorage, LoopError, LoopStorage, LoopTweet, TopicScorer,
9};
10use super::super::posting_queue::PostAction;
11use super::super::target_loop::TargetStorage;
12use super::helpers::{parse_datetime, sqlx_to_content_error, storage_to_loop_error};
13use crate::storage::{self, DbPool};
14
15pub struct StorageAdapter {
20 pool: DbPool,
21}
22
23impl StorageAdapter {
24 pub fn new(pool: DbPool) -> Self {
25 Self { pool }
26 }
27}
28
29#[async_trait::async_trait]
30impl LoopStorage for StorageAdapter {
31 async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError> {
32 storage::cursors::get_cursor(&self.pool, key)
33 .await
34 .map_err(storage_to_loop_error)
35 }
36
37 async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError> {
38 storage::cursors::set_cursor(&self.pool, key, value)
39 .await
40 .map_err(storage_to_loop_error)
41 }
42
43 async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
44 storage::tweets::tweet_exists(&self.pool, tweet_id)
45 .await
46 .map_err(storage_to_loop_error)
47 }
48
49 async fn store_discovered_tweet(
50 &self,
51 tweet: &LoopTweet,
52 score: f32,
53 keyword: &str,
54 ) -> Result<(), LoopError> {
55 let discovered = storage::tweets::DiscoveredTweet {
56 id: tweet.id.clone(),
57 author_id: tweet.author_id.clone(),
58 author_username: tweet.author_username.clone(),
59 content: tweet.text.clone(),
60 like_count: tweet.likes as i64,
61 retweet_count: tweet.retweets as i64,
62 reply_count: tweet.replies as i64,
63 impression_count: None,
64 relevance_score: Some(score as f64),
65 matched_keyword: Some(keyword.to_string()),
66 discovered_at: Utc::now().to_rfc3339(),
67 replied_to: 0,
68 };
69 storage::tweets::insert_discovered_tweet(&self.pool, &discovered)
70 .await
71 .map_err(storage_to_loop_error)
72 }
73
74 async fn log_action(
75 &self,
76 action_type: &str,
77 status: &str,
78 message: &str,
79 ) -> Result<(), LoopError> {
80 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
81 .await
82 .map_err(storage_to_loop_error)
83 }
84}
85
86pub struct ContentStorageAdapter {
88 pool: DbPool,
89 post_tx: mpsc::Sender<PostAction>,
90}
91
92impl ContentStorageAdapter {
93 pub fn new(pool: DbPool, post_tx: mpsc::Sender<PostAction>) -> Self {
94 Self { pool, post_tx }
95 }
96}
97
98#[async_trait::async_trait]
99impl ContentStorage for ContentStorageAdapter {
100 async fn last_tweet_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
101 let time_str = storage::threads::get_last_original_tweet_time(&self.pool)
102 .await
103 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
104 Ok(time_str.and_then(|s| parse_datetime(&s)))
105 }
106
107 async fn todays_tweet_times(&self) -> Result<Vec<DateTime<Utc>>, ContentLoopError> {
108 let time_strs = storage::threads::get_todays_tweet_times(&self.pool)
109 .await
110 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
111 Ok(time_strs.iter().filter_map(|s| parse_datetime(s)).collect())
112 }
113
114 async fn last_thread_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
115 let time_str = storage::threads::get_last_thread_time(&self.pool)
116 .await
117 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
118 Ok(time_str.and_then(|s| parse_datetime(&s)))
119 }
120
121 async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError> {
122 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
124 self.post_tx
125 .send(PostAction::Tweet {
126 content: content.to_string(),
127 media_ids: vec![],
128 result_tx: Some(result_tx),
129 })
130 .await
131 .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?;
132
133 let tweet_id = result_rx
134 .await
135 .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?
136 .map_err(ContentLoopError::PostFailed)?;
137
138 let original = storage::threads::OriginalTweet {
140 id: 0,
141 tweet_id: Some(tweet_id),
142 content: content.to_string(),
143 topic: Some(topic.to_string()),
144 llm_provider: None,
145 created_at: Utc::now().to_rfc3339(),
146 status: "sent".to_string(),
147 error_message: None,
148 };
149 storage::threads::insert_original_tweet(&self.pool, &original)
150 .await
151 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
152
153 storage::rate_limits::increment_rate_limit(&self.pool, "tweet")
155 .await
156 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
157
158 Ok(())
159 }
160
161 async fn create_thread(
162 &self,
163 topic: &str,
164 tweet_count: usize,
165 ) -> Result<String, ContentLoopError> {
166 let thread = storage::threads::Thread {
167 id: 0,
168 topic: topic.to_string(),
169 tweet_count: tweet_count as i64,
170 root_tweet_id: None,
171 created_at: Utc::now().to_rfc3339(),
172 status: "pending".to_string(),
173 };
174 let id = storage::threads::insert_thread(&self.pool, &thread)
175 .await
176 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
177 Ok(id.to_string())
178 }
179
180 async fn update_thread_status(
181 &self,
182 thread_id: &str,
183 status: &str,
184 tweet_count: usize,
185 root_tweet_id: Option<&str>,
186 ) -> Result<(), ContentLoopError> {
187 let id: i64 = thread_id
188 .parse()
189 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
190
191 sqlx::query(
192 "UPDATE threads SET status = ?1, tweet_count = ?2, root_tweet_id = ?3 WHERE id = ?4",
193 )
194 .bind(status)
195 .bind(tweet_count as i64)
196 .bind(root_tweet_id)
197 .bind(id)
198 .execute(&self.pool)
199 .await
200 .map_err(sqlx_to_content_error)?;
201
202 if status == "sent" {
204 storage::rate_limits::increment_rate_limit(&self.pool, "thread")
205 .await
206 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
207 }
208
209 Ok(())
210 }
211
212 async fn store_thread_tweet(
213 &self,
214 thread_id: &str,
215 position: usize,
216 tweet_id: &str,
217 content: &str,
218 ) -> Result<(), ContentLoopError> {
219 let tid: i64 = thread_id
220 .parse()
221 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
222
223 sqlx::query(
224 "INSERT INTO thread_tweets (thread_id, position, tweet_id, content, created_at)
225 VALUES (?1, ?2, ?3, ?4, datetime('now'))",
226 )
227 .bind(tid)
228 .bind(position as i64)
229 .bind(tweet_id)
230 .bind(content)
231 .execute(&self.pool)
232 .await
233 .map_err(sqlx_to_content_error)?;
234
235 Ok(())
236 }
237
238 async fn log_action(
239 &self,
240 action_type: &str,
241 status: &str,
242 message: &str,
243 ) -> Result<(), ContentLoopError> {
244 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
245 .await
246 .map_err(|e| ContentLoopError::StorageError(e.to_string()))
247 }
248
249 async fn next_scheduled_item(&self) -> Result<Option<(i64, String, String)>, ContentLoopError> {
250 let items = storage::scheduled_content::get_due_items(&self.pool)
251 .await
252 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
253
254 Ok(items
255 .into_iter()
256 .next()
257 .map(|item| (item.id, item.content_type, item.content)))
258 }
259
260 async fn mark_scheduled_posted(
261 &self,
262 id: i64,
263 tweet_id: Option<&str>,
264 ) -> Result<(), ContentLoopError> {
265 storage::scheduled_content::update_status(&self.pool, id, "posted", tweet_id)
266 .await
267 .map_err(|e| ContentLoopError::StorageError(e.to_string()))
268 }
269}
270
271pub struct TargetStorageAdapter {
273 pool: DbPool,
274}
275
276impl TargetStorageAdapter {
277 pub fn new(pool: DbPool) -> Self {
278 Self { pool }
279 }
280}
281
282#[async_trait::async_trait]
283impl TargetStorage for TargetStorageAdapter {
284 async fn upsert_target_account(
285 &self,
286 account_id: &str,
287 username: &str,
288 ) -> Result<(), LoopError> {
289 storage::target_accounts::upsert_target_account(&self.pool, account_id, username)
290 .await
291 .map_err(storage_to_loop_error)
292 }
293
294 async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
295 storage::target_accounts::target_tweet_exists(&self.pool, tweet_id)
296 .await
297 .map_err(storage_to_loop_error)
298 }
299
300 async fn store_target_tweet(
301 &self,
302 tweet_id: &str,
303 account_id: &str,
304 content: &str,
305 created_at: &str,
306 reply_count: i64,
307 like_count: i64,
308 relevance_score: f64,
309 ) -> Result<(), LoopError> {
310 storage::target_accounts::store_target_tweet(
311 &self.pool,
312 tweet_id,
313 account_id,
314 content,
315 created_at,
316 reply_count,
317 like_count,
318 relevance_score,
319 )
320 .await
321 .map_err(storage_to_loop_error)
322 }
323
324 async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError> {
325 storage::target_accounts::mark_target_tweet_replied(&self.pool, tweet_id)
326 .await
327 .map_err(storage_to_loop_error)
328 }
329
330 async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError> {
331 storage::target_accounts::record_target_reply(&self.pool, account_id)
332 .await
333 .map_err(storage_to_loop_error)
334 }
335
336 async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
337 storage::target_accounts::count_target_replies_today(&self.pool)
338 .await
339 .map_err(storage_to_loop_error)
340 }
341
342 async fn log_action(
343 &self,
344 action_type: &str,
345 status: &str,
346 message: &str,
347 ) -> Result<(), LoopError> {
348 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
349 .await
350 .map_err(storage_to_loop_error)
351 }
352}
353
354pub struct AnalyticsStorageAdapter {
356 pool: DbPool,
357}
358
359impl AnalyticsStorageAdapter {
360 pub fn new(pool: DbPool) -> Self {
361 Self { pool }
362 }
363}
364
365#[async_trait::async_trait]
366impl AnalyticsStorage for AnalyticsStorageAdapter {
367 async fn store_follower_snapshot(
368 &self,
369 followers: i64,
370 following: i64,
371 tweets: i64,
372 ) -> Result<(), AnalyticsError> {
373 storage::analytics::upsert_follower_snapshot(&self.pool, followers, following, tweets)
374 .await
375 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
376 }
377
378 async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
379 let row: Option<(i64,)> = sqlx::query_as(
380 "SELECT follower_count FROM follower_snapshots
381 WHERE snapshot_date < date('now')
382 ORDER BY snapshot_date DESC LIMIT 1",
383 )
384 .fetch_optional(&self.pool)
385 .await
386 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
387 Ok(row.map(|(c,)| c))
388 }
389
390 async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
391 let rows: Vec<(String,)> = sqlx::query_as(
392 "SELECT rs.reply_tweet_id FROM replies_sent rs
393 WHERE rs.status = 'sent'
394 AND rs.reply_tweet_id IS NOT NULL
395 AND rs.created_at >= datetime('now', '-25 hours')
396 AND rs.created_at <= datetime('now', '-23 hours')
397 AND NOT EXISTS (
398 SELECT 1 FROM reply_performance rp WHERE rp.reply_id = rs.reply_tweet_id
399 )",
400 )
401 .fetch_all(&self.pool)
402 .await
403 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
404 Ok(rows.into_iter().map(|(id,)| id).collect())
405 }
406
407 async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
408 let rows: Vec<(String,)> = sqlx::query_as(
409 "SELECT ot.tweet_id FROM original_tweets ot
410 WHERE ot.status = 'sent'
411 AND ot.tweet_id IS NOT NULL
412 AND ot.created_at >= datetime('now', '-25 hours')
413 AND ot.created_at <= datetime('now', '-23 hours')
414 AND NOT EXISTS (
415 SELECT 1 FROM tweet_performance tp WHERE tp.tweet_id = ot.tweet_id
416 )",
417 )
418 .fetch_all(&self.pool)
419 .await
420 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
421 Ok(rows.into_iter().map(|(id,)| id).collect())
422 }
423
424 async fn store_reply_performance(
425 &self,
426 reply_id: &str,
427 likes: i64,
428 replies: i64,
429 impressions: i64,
430 score: f64,
431 ) -> Result<(), AnalyticsError> {
432 storage::analytics::upsert_reply_performance(
433 &self.pool,
434 reply_id,
435 likes,
436 replies,
437 impressions,
438 score,
439 )
440 .await
441 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
442 }
443
444 async fn store_tweet_performance(
445 &self,
446 tweet_id: &str,
447 likes: i64,
448 retweets: i64,
449 replies: i64,
450 impressions: i64,
451 score: f64,
452 ) -> Result<(), AnalyticsError> {
453 storage::analytics::upsert_tweet_performance(
454 &self.pool,
455 tweet_id,
456 likes,
457 retweets,
458 replies,
459 impressions,
460 score,
461 )
462 .await
463 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
464 }
465
466 async fn update_content_score(
467 &self,
468 topic: &str,
469 format: &str,
470 score: f64,
471 ) -> Result<(), AnalyticsError> {
472 storage::analytics::update_content_score(&self.pool, topic, format, score)
473 .await
474 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
475 }
476
477 async fn log_action(
478 &self,
479 action_type: &str,
480 status: &str,
481 message: &str,
482 ) -> Result<(), AnalyticsError> {
483 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
484 .await
485 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
486 }
487}
488
489pub struct TopicScorerAdapter {
491 pool: DbPool,
492}
493
494impl TopicScorerAdapter {
495 pub fn new(pool: DbPool) -> Self {
496 Self { pool }
497 }
498}
499
500#[async_trait::async_trait]
501impl TopicScorer for TopicScorerAdapter {
502 async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError> {
503 let scores = storage::analytics::get_top_topics(&self.pool, limit)
504 .await
505 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
506 Ok(scores.into_iter().map(|cs| cs.topic).collect())
507 }
508}