1use chrono::{DateTime, Utc};
4use tokio::sync::mpsc;
5
6use super::super::analytics_loop::{AnalyticsError, AnalyticsStorage};
7use super::super::loop_helpers::{
8 ContentLoopError, ContentStorage, LoopError, LoopStorage, LoopTweet, TopicScorer,
9};
10use super::super::posting_queue::PostAction;
11use super::super::target_loop::TargetStorage;
12use super::helpers::{parse_datetime, sqlx_to_content_error, storage_to_loop_error};
13use crate::storage::{self, DbPool};
14
15pub struct StorageAdapter {
20 pool: DbPool,
21}
22
23impl StorageAdapter {
24 pub fn new(pool: DbPool) -> Self {
25 Self { pool }
26 }
27}
28
29#[async_trait::async_trait]
30impl LoopStorage for StorageAdapter {
31 async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError> {
32 storage::cursors::get_cursor(&self.pool, key)
33 .await
34 .map_err(storage_to_loop_error)
35 }
36
37 async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError> {
38 storage::cursors::set_cursor(&self.pool, key, value)
39 .await
40 .map_err(storage_to_loop_error)
41 }
42
43 async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
44 storage::tweets::tweet_exists(&self.pool, tweet_id)
45 .await
46 .map_err(storage_to_loop_error)
47 }
48
49 async fn store_discovered_tweet(
50 &self,
51 tweet: &LoopTweet,
52 score: f32,
53 keyword: &str,
54 ) -> Result<(), LoopError> {
55 let discovered = storage::tweets::DiscoveredTweet {
56 id: tweet.id.clone(),
57 author_id: tweet.author_id.clone(),
58 author_username: tweet.author_username.clone(),
59 content: tweet.text.clone(),
60 like_count: tweet.likes as i64,
61 retweet_count: tweet.retweets as i64,
62 reply_count: tweet.replies as i64,
63 impression_count: None,
64 relevance_score: Some(score as f64),
65 matched_keyword: Some(keyword.to_string()),
66 discovered_at: Utc::now().to_rfc3339(),
67 replied_to: 0,
68 };
69 storage::tweets::insert_discovered_tweet(&self.pool, &discovered)
70 .await
71 .map_err(storage_to_loop_error)
72 }
73
74 async fn log_action(
75 &self,
76 action_type: &str,
77 status: &str,
78 message: &str,
79 ) -> Result<(), LoopError> {
80 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
81 .await
82 .map_err(storage_to_loop_error)
83 }
84}
85
86pub struct ContentStorageAdapter {
88 pool: DbPool,
89 post_tx: mpsc::Sender<PostAction>,
90}
91
92impl ContentStorageAdapter {
93 pub fn new(pool: DbPool, post_tx: mpsc::Sender<PostAction>) -> Self {
94 Self { pool, post_tx }
95 }
96}
97
98#[async_trait::async_trait]
99impl ContentStorage for ContentStorageAdapter {
100 async fn last_tweet_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
101 let time_str = storage::threads::get_last_original_tweet_time(&self.pool)
102 .await
103 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
104 Ok(time_str.and_then(|s| parse_datetime(&s)))
105 }
106
107 async fn todays_tweet_times(&self) -> Result<Vec<DateTime<Utc>>, ContentLoopError> {
108 let time_strs = storage::threads::get_todays_tweet_times(&self.pool)
109 .await
110 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
111 Ok(time_strs.iter().filter_map(|s| parse_datetime(s)).collect())
112 }
113
114 async fn last_thread_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
115 let time_str = storage::threads::get_last_thread_time(&self.pool)
116 .await
117 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
118 Ok(time_str.and_then(|s| parse_datetime(&s)))
119 }
120
121 async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError> {
122 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
124 self.post_tx
125 .send(PostAction::Tweet {
126 content: content.to_string(),
127 media_ids: vec![],
128 result_tx: Some(result_tx),
129 })
130 .await
131 .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?;
132
133 let tweet_id = result_rx
134 .await
135 .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?
136 .map_err(ContentLoopError::PostFailed)?;
137
138 let original = storage::threads::OriginalTweet {
140 id: 0,
141 tweet_id: Some(tweet_id),
142 content: content.to_string(),
143 topic: Some(topic.to_string()),
144 llm_provider: None,
145 created_at: Utc::now().to_rfc3339(),
146 status: "sent".to_string(),
147 error_message: None,
148 };
149 storage::threads::insert_original_tweet(&self.pool, &original)
150 .await
151 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
152
153 storage::rate_limits::increment_rate_limit(&self.pool, "tweet")
155 .await
156 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
157
158 Ok(())
159 }
160
161 async fn create_thread(
162 &self,
163 topic: &str,
164 tweet_count: usize,
165 ) -> Result<String, ContentLoopError> {
166 let thread = storage::threads::Thread {
167 id: 0,
168 topic: topic.to_string(),
169 tweet_count: tweet_count as i64,
170 root_tweet_id: None,
171 created_at: Utc::now().to_rfc3339(),
172 status: "pending".to_string(),
173 };
174 let id = storage::threads::insert_thread(&self.pool, &thread)
175 .await
176 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
177 Ok(id.to_string())
178 }
179
180 async fn update_thread_status(
181 &self,
182 thread_id: &str,
183 status: &str,
184 tweet_count: usize,
185 root_tweet_id: Option<&str>,
186 ) -> Result<(), ContentLoopError> {
187 let id: i64 = thread_id
188 .parse()
189 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
190
191 sqlx::query(
192 "UPDATE threads SET status = ?1, tweet_count = ?2, root_tweet_id = ?3 WHERE id = ?4",
193 )
194 .bind(status)
195 .bind(tweet_count as i64)
196 .bind(root_tweet_id)
197 .bind(id)
198 .execute(&self.pool)
199 .await
200 .map_err(sqlx_to_content_error)?;
201
202 if status == "sent" {
204 storage::rate_limits::increment_rate_limit(&self.pool, "thread")
205 .await
206 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
207 }
208
209 Ok(())
210 }
211
212 async fn store_thread_tweet(
213 &self,
214 thread_id: &str,
215 position: usize,
216 tweet_id: &str,
217 content: &str,
218 ) -> Result<(), ContentLoopError> {
219 let tid: i64 = thread_id
220 .parse()
221 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
222
223 sqlx::query(
224 "INSERT INTO thread_tweets (thread_id, position, tweet_id, content, created_at)
225 VALUES (?1, ?2, ?3, ?4, datetime('now'))",
226 )
227 .bind(tid)
228 .bind(position as i64)
229 .bind(tweet_id)
230 .bind(content)
231 .execute(&self.pool)
232 .await
233 .map_err(sqlx_to_content_error)?;
234
235 Ok(())
236 }
237
238 async fn log_action(
239 &self,
240 action_type: &str,
241 status: &str,
242 message: &str,
243 ) -> Result<(), ContentLoopError> {
244 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
245 .await
246 .map_err(|e| ContentLoopError::StorageError(e.to_string()))
247 }
248
249 async fn next_scheduled_item(&self) -> Result<Option<(i64, String, String)>, ContentLoopError> {
250 let items = storage::scheduled_content::get_due_items(&self.pool)
251 .await
252 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
253
254 Ok(items
255 .into_iter()
256 .next()
257 .map(|item| (item.id, item.content_type, item.content)))
258 }
259
260 async fn mark_scheduled_posted(
261 &self,
262 id: i64,
263 tweet_id: Option<&str>,
264 ) -> Result<(), ContentLoopError> {
265 storage::scheduled_content::update_status(&self.pool, id, "posted", tweet_id)
266 .await
267 .map_err(|e| ContentLoopError::StorageError(e.to_string()))
268 }
269
270 async fn mark_failed_permanent(
271 &self,
272 thread_id: &str,
273 error: &str,
274 ) -> Result<(), ContentLoopError> {
275 let id: i64 = thread_id
276 .parse()
277 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
278
279 sqlx::query(
281 "UPDATE threads SET status = ?1, failure_kind = ?2, last_error = ?3, failed_at = datetime('now') WHERE id = ?4",
282 )
283 .bind("failed")
284 .bind("permanent")
285 .bind(error)
286 .bind(id)
287 .execute(&self.pool)
288 .await
289 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
290
291 let row: (String, u32) =
294 sqlx::query_as("SELECT topic, retry_count FROM threads WHERE id = ?1")
295 .bind(id)
296 .fetch_one(&self.pool)
297 .await
298 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
299
300 let (topic, retry_count) = row;
301
302 let tweets: Vec<(String,)> = sqlx::query_as(
304 "SELECT content FROM thread_tweets WHERE thread_id = ?1 ORDER BY position",
305 )
306 .bind(id)
307 .fetch_all(&self.pool)
308 .await
309 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
310
311 let content = if tweets.is_empty() {
312 format!("Failed thread id={}", id)
313 } else {
314 tweets
315 .iter()
316 .map(|t| t.0.as_str())
317 .collect::<Vec<_>>()
318 .join("\n---\n")
319 };
320
321 let metadata = format!(
323 "Failed thread id={}, retries={}, error: {}",
324 id, retry_count, error
325 );
326
327 sqlx::query(
329 "INSERT INTO approval_queue (action_type, generated_content, topic, status, reason) VALUES (?1, ?2, ?3, ?4, ?5)",
330 )
331 .bind("failed_post_recovery")
332 .bind(content)
333 .bind(topic)
334 .bind("pending")
335 .bind(metadata)
336 .execute(&self.pool)
337 .await
338 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
339
340 Ok(())
341 }
342
343 async fn increment_retry(&self, thread_id: &str, error: &str) -> Result<u32, ContentLoopError> {
344 let id: i64 = thread_id
345 .parse()
346 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
347
348 sqlx::query(
350 "UPDATE threads SET retry_count = retry_count + 1, failure_kind = ?1, last_error = ?2, failed_at = datetime('now') WHERE id = ?3",
351 )
352 .bind("transient")
353 .bind(error)
354 .bind(id)
355 .execute(&self.pool)
356 .await
357 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
358
359 let row: (i64,) = sqlx::query_as("SELECT retry_count FROM threads WHERE id = ?1")
361 .bind(id)
362 .fetch_one(&self.pool)
363 .await
364 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
365
366 Ok(row.0 as u32)
367 }
368}
369
370pub struct TargetStorageAdapter {
372 pool: DbPool,
373}
374
375impl TargetStorageAdapter {
376 pub fn new(pool: DbPool) -> Self {
377 Self { pool }
378 }
379}
380
381#[async_trait::async_trait]
382impl TargetStorage for TargetStorageAdapter {
383 async fn upsert_target_account(
384 &self,
385 account_id: &str,
386 username: &str,
387 ) -> Result<(), LoopError> {
388 storage::target_accounts::upsert_target_account(&self.pool, account_id, username)
389 .await
390 .map_err(storage_to_loop_error)
391 }
392
393 async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
394 storage::target_accounts::target_tweet_exists(&self.pool, tweet_id)
395 .await
396 .map_err(storage_to_loop_error)
397 }
398
399 async fn store_target_tweet(
400 &self,
401 tweet_id: &str,
402 account_id: &str,
403 content: &str,
404 created_at: &str,
405 reply_count: i64,
406 like_count: i64,
407 relevance_score: f64,
408 ) -> Result<(), LoopError> {
409 storage::target_accounts::store_target_tweet(
410 &self.pool,
411 tweet_id,
412 account_id,
413 content,
414 created_at,
415 reply_count,
416 like_count,
417 relevance_score,
418 )
419 .await
420 .map_err(storage_to_loop_error)
421 }
422
423 async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError> {
424 storage::target_accounts::mark_target_tweet_replied(&self.pool, tweet_id)
425 .await
426 .map_err(storage_to_loop_error)
427 }
428
429 async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError> {
430 storage::target_accounts::record_target_reply(&self.pool, account_id)
431 .await
432 .map_err(storage_to_loop_error)
433 }
434
435 async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
436 storage::target_accounts::count_target_replies_today(&self.pool)
437 .await
438 .map_err(storage_to_loop_error)
439 }
440
441 async fn log_action(
442 &self,
443 action_type: &str,
444 status: &str,
445 message: &str,
446 ) -> Result<(), LoopError> {
447 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
448 .await
449 .map_err(storage_to_loop_error)
450 }
451}
452
453pub struct AnalyticsStorageAdapter {
455 pool: DbPool,
456}
457
458impl AnalyticsStorageAdapter {
459 pub fn new(pool: DbPool) -> Self {
460 Self { pool }
461 }
462}
463
464#[async_trait::async_trait]
465impl AnalyticsStorage for AnalyticsStorageAdapter {
466 async fn store_follower_snapshot(
467 &self,
468 followers: i64,
469 following: i64,
470 tweets: i64,
471 ) -> Result<(), AnalyticsError> {
472 storage::analytics::upsert_follower_snapshot(&self.pool, followers, following, tweets)
473 .await
474 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
475 }
476
477 async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
478 let row: Option<(i64,)> = sqlx::query_as(
479 "SELECT follower_count FROM follower_snapshots
480 WHERE snapshot_date < date('now')
481 ORDER BY snapshot_date DESC LIMIT 1",
482 )
483 .fetch_optional(&self.pool)
484 .await
485 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
486 Ok(row.map(|(c,)| c))
487 }
488
489 async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
490 let rows: Vec<(String,)> = sqlx::query_as(
491 "SELECT rs.reply_tweet_id FROM replies_sent rs
492 WHERE rs.status = 'sent'
493 AND rs.reply_tweet_id IS NOT NULL
494 AND rs.created_at >= datetime('now', '-25 hours')
495 AND rs.created_at <= datetime('now', '-23 hours')
496 AND NOT EXISTS (
497 SELECT 1 FROM reply_performance rp WHERE rp.reply_id = rs.reply_tweet_id
498 )",
499 )
500 .fetch_all(&self.pool)
501 .await
502 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
503 Ok(rows.into_iter().map(|(id,)| id).collect())
504 }
505
506 async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
507 let rows: Vec<(String,)> = sqlx::query_as(
508 "SELECT ot.tweet_id FROM original_tweets ot
509 WHERE ot.status = 'sent'
510 AND ot.tweet_id IS NOT NULL
511 AND ot.created_at >= datetime('now', '-25 hours')
512 AND ot.created_at <= datetime('now', '-23 hours')
513 AND NOT EXISTS (
514 SELECT 1 FROM tweet_performance tp WHERE tp.tweet_id = ot.tweet_id
515 )",
516 )
517 .fetch_all(&self.pool)
518 .await
519 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
520 Ok(rows.into_iter().map(|(id,)| id).collect())
521 }
522
523 async fn store_reply_performance(
524 &self,
525 reply_id: &str,
526 likes: i64,
527 replies: i64,
528 impressions: i64,
529 score: f64,
530 ) -> Result<(), AnalyticsError> {
531 storage::analytics::upsert_reply_performance(
532 &self.pool,
533 reply_id,
534 likes,
535 replies,
536 impressions,
537 score,
538 )
539 .await
540 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
541 }
542
543 async fn store_tweet_performance(
544 &self,
545 tweet_id: &str,
546 likes: i64,
547 retweets: i64,
548 replies: i64,
549 impressions: i64,
550 score: f64,
551 ) -> Result<(), AnalyticsError> {
552 storage::analytics::upsert_tweet_performance(
553 &self.pool,
554 tweet_id,
555 likes,
556 retweets,
557 replies,
558 impressions,
559 score,
560 )
561 .await
562 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
563 }
564
565 async fn update_content_score(
566 &self,
567 topic: &str,
568 format: &str,
569 score: f64,
570 ) -> Result<(), AnalyticsError> {
571 storage::analytics::update_content_score(&self.pool, topic, format, score)
572 .await
573 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
574 }
575
576 async fn log_action(
577 &self,
578 action_type: &str,
579 status: &str,
580 message: &str,
581 ) -> Result<(), AnalyticsError> {
582 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
583 .await
584 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
585 }
586
587 async fn run_aggregations(&self) -> Result<(), AnalyticsError> {
588 let account_id = storage::accounts::DEFAULT_ACCOUNT_ID;
589 storage::analytics::aggregate_best_times_for(&self.pool, account_id)
590 .await
591 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
592 storage::analytics::aggregate_reach_for(&self.pool, account_id)
593 .await
594 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
595 Ok(())
596 }
597}
598
599pub struct TopicScorerAdapter {
601 pool: DbPool,
602}
603
604impl TopicScorerAdapter {
605 pub fn new(pool: DbPool) -> Self {
606 Self { pool }
607 }
608}
609
610#[async_trait::async_trait]
611impl TopicScorer for TopicScorerAdapter {
612 async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError> {
613 let scores = storage::analytics::get_top_topics(&self.pool, limit)
614 .await
615 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
616 Ok(scores.into_iter().map(|cs| cs.topic).collect())
617 }
618}