1use std::collections::HashMap;
9use std::sync::Arc;
10
11use chrono::{DateTime, NaiveDateTime, Utc};
12use tokio::sync::mpsc;
13
14use crate::content::ContentGenerator;
15use crate::error::{LlmError, XApiError};
16use crate::safety::SafetyGuard;
17use crate::scoring::{self, ScoringEngine, TweetData};
18use crate::storage::{self, DbPool};
19use crate::x_api::{SearchResponse, XApiClient, XApiHttpClient};
20
21use super::analytics_loop::{AnalyticsError, AnalyticsStorage, EngagementFetcher, ProfileFetcher};
22use super::loop_helpers::{
23 ContentLoopError, ContentSafety, ContentStorage, LoopError, LoopStorage, LoopTweet,
24 MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker, ScoreResult, ThreadPoster,
25 TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
26};
27use super::posting_queue::{ApprovalQueue, PostAction, PostExecutor};
28use super::status_reporter::{ActionCounts, StatusQuerier};
29use super::target_loop::{TargetStorage, TargetTweetFetcher, TargetUserManager};
30use super::thread_loop::ThreadGenerator;
31
32fn search_response_to_loop_tweets(response: SearchResponse) -> Vec<LoopTweet> {
41 let users: HashMap<&str, _> = response
42 .includes
43 .as_ref()
44 .map(|inc| inc.users.iter().map(|u| (u.id.as_str(), u)).collect())
45 .unwrap_or_default();
46
47 response
48 .data
49 .into_iter()
50 .map(|tweet| {
51 let user = users.get(tweet.author_id.as_str());
52 LoopTweet {
53 id: tweet.id,
54 text: tweet.text,
55 author_id: tweet.author_id,
56 author_username: user.map(|u| u.username.clone()).unwrap_or_default(),
57 author_followers: user.map(|u| u.public_metrics.followers_count).unwrap_or(0),
58 created_at: tweet.created_at,
59 likes: tweet.public_metrics.like_count,
60 retweets: tweet.public_metrics.retweet_count,
61 replies: tweet.public_metrics.reply_count,
62 }
63 })
64 .collect()
65}
66
67fn xapi_to_loop_error(e: XApiError) -> LoopError {
69 match e {
70 XApiError::RateLimited { retry_after } => LoopError::RateLimited { retry_after },
71 XApiError::AuthExpired => LoopError::AuthExpired,
72 XApiError::Network { source } => LoopError::NetworkError(source.to_string()),
73 other => LoopError::Other(other.to_string()),
74 }
75}
76
77fn xapi_to_content_error(e: XApiError) -> ContentLoopError {
79 match e {
80 XApiError::RateLimited { retry_after } => ContentLoopError::PostFailed(format!(
81 "rate limited{}",
82 retry_after
83 .map(|s| format!(", retry after {s}s"))
84 .unwrap_or_default()
85 )),
86 XApiError::Network { source } => ContentLoopError::NetworkError(source.to_string()),
87 other => ContentLoopError::PostFailed(other.to_string()),
88 }
89}
90
91fn xapi_to_analytics_error(e: XApiError) -> AnalyticsError {
93 AnalyticsError::ApiError(e.to_string())
94}
95
96fn llm_to_loop_error(e: LlmError) -> LoopError {
98 LoopError::LlmFailure(e.to_string())
99}
100
101fn llm_to_content_error(e: LlmError) -> ContentLoopError {
103 ContentLoopError::LlmFailure(e.to_string())
104}
105
106fn sqlx_to_content_error(e: sqlx::Error) -> ContentLoopError {
108 ContentLoopError::StorageError(e.to_string())
109}
110
111fn storage_to_loop_error(e: crate::error::StorageError) -> LoopError {
113 LoopError::StorageError(e.to_string())
114}
115
116fn parse_datetime(s: &str) -> Option<DateTime<Utc>> {
121 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
122 return Some(dt.with_timezone(&Utc));
123 }
124 if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
125 return Some(naive.and_utc());
126 }
127 if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ") {
128 return Some(naive.and_utc());
129 }
130 None
131}
132
133pub struct XApiSearchAdapter {
139 client: Arc<XApiHttpClient>,
140}
141
142impl XApiSearchAdapter {
143 pub fn new(client: Arc<XApiHttpClient>) -> Self {
144 Self { client }
145 }
146}
147
148#[async_trait::async_trait]
149impl TweetSearcher for XApiSearchAdapter {
150 async fn search_tweets(&self, query: &str) -> Result<Vec<LoopTweet>, LoopError> {
151 let response = self
152 .client
153 .search_tweets(query, 20, None)
154 .await
155 .map_err(xapi_to_loop_error)?;
156 Ok(search_response_to_loop_tweets(response))
157 }
158}
159
160pub struct XApiMentionsAdapter {
162 client: Arc<XApiHttpClient>,
163 own_user_id: String,
164}
165
166impl XApiMentionsAdapter {
167 pub fn new(client: Arc<XApiHttpClient>, own_user_id: String) -> Self {
168 Self {
169 client,
170 own_user_id,
171 }
172 }
173}
174
175#[async_trait::async_trait]
176impl MentionsFetcher for XApiMentionsAdapter {
177 async fn get_mentions(&self, since_id: Option<&str>) -> Result<Vec<LoopTweet>, LoopError> {
178 let response = self
179 .client
180 .get_mentions(&self.own_user_id, since_id)
181 .await
182 .map_err(xapi_to_loop_error)?;
183 Ok(search_response_to_loop_tweets(response))
184 }
185}
186
187pub struct XApiTargetAdapter {
189 client: Arc<XApiHttpClient>,
190}
191
192impl XApiTargetAdapter {
193 pub fn new(client: Arc<XApiHttpClient>) -> Self {
194 Self { client }
195 }
196}
197
198#[async_trait::async_trait]
199impl TargetTweetFetcher for XApiTargetAdapter {
200 async fn fetch_user_tweets(&self, user_id: &str) -> Result<Vec<LoopTweet>, LoopError> {
201 let response = self
202 .client
203 .get_user_tweets(user_id, 10)
204 .await
205 .map_err(xapi_to_loop_error)?;
206 Ok(search_response_to_loop_tweets(response))
207 }
208}
209
210#[async_trait::async_trait]
211impl TargetUserManager for XApiTargetAdapter {
212 async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError> {
213 let user = self
214 .client
215 .get_user_by_username(username)
216 .await
217 .map_err(xapi_to_loop_error)?;
218 Ok((user.id, user.username))
219 }
220
221 async fn follow_user(
222 &self,
223 source_user_id: &str,
224 target_user_id: &str,
225 ) -> Result<(), LoopError> {
226 self.client
227 .follow_user(source_user_id, target_user_id)
228 .await
229 .map_err(xapi_to_loop_error)
230 }
231}
232
233pub struct XApiProfileAdapter {
235 client: Arc<XApiHttpClient>,
236}
237
238impl XApiProfileAdapter {
239 pub fn new(client: Arc<XApiHttpClient>) -> Self {
240 Self { client }
241 }
242}
243
244#[async_trait::async_trait]
245impl ProfileFetcher for XApiProfileAdapter {
246 async fn get_profile_metrics(
247 &self,
248 ) -> Result<super::analytics_loop::ProfileMetrics, AnalyticsError> {
249 let user = self
250 .client
251 .get_me()
252 .await
253 .map_err(xapi_to_analytics_error)?;
254 Ok(super::analytics_loop::ProfileMetrics {
255 follower_count: user.public_metrics.followers_count as i64,
256 following_count: user.public_metrics.following_count as i64,
257 tweet_count: user.public_metrics.tweet_count as i64,
258 })
259 }
260}
261
262#[async_trait::async_trait]
263impl EngagementFetcher for XApiProfileAdapter {
264 async fn get_tweet_metrics(
265 &self,
266 tweet_id: &str,
267 ) -> Result<super::analytics_loop::TweetMetrics, AnalyticsError> {
268 let tweet = self
269 .client
270 .get_tweet(tweet_id)
271 .await
272 .map_err(xapi_to_analytics_error)?;
273 Ok(super::analytics_loop::TweetMetrics {
274 likes: tweet.public_metrics.like_count as i64,
275 retweets: tweet.public_metrics.retweet_count as i64,
276 replies: tweet.public_metrics.reply_count as i64,
277 impressions: tweet.public_metrics.impression_count as i64,
278 })
279 }
280}
281
282pub struct XApiPostExecutorAdapter {
284 client: Arc<XApiHttpClient>,
285}
286
287impl XApiPostExecutorAdapter {
288 pub fn new(client: Arc<XApiHttpClient>) -> Self {
289 Self { client }
290 }
291}
292
293#[async_trait::async_trait]
294impl PostExecutor for XApiPostExecutorAdapter {
295 async fn execute_reply(&self, tweet_id: &str, content: &str) -> Result<String, String> {
296 self.client
297 .reply_to_tweet(content, tweet_id)
298 .await
299 .map(|posted| posted.id)
300 .map_err(|e| e.to_string())
301 }
302
303 async fn execute_tweet(&self, content: &str) -> Result<String, String> {
304 self.client
305 .post_tweet(content)
306 .await
307 .map(|posted| posted.id)
308 .map_err(|e| e.to_string())
309 }
310}
311
312pub struct XApiThreadPosterAdapter {
314 client: Arc<XApiHttpClient>,
315}
316
317impl XApiThreadPosterAdapter {
318 pub fn new(client: Arc<XApiHttpClient>) -> Self {
319 Self { client }
320 }
321}
322
323#[async_trait::async_trait]
324impl ThreadPoster for XApiThreadPosterAdapter {
325 async fn post_tweet(&self, content: &str) -> Result<String, ContentLoopError> {
326 self.client
327 .post_tweet(content)
328 .await
329 .map(|posted| posted.id)
330 .map_err(xapi_to_content_error)
331 }
332
333 async fn reply_to_tweet(
334 &self,
335 in_reply_to: &str,
336 content: &str,
337 ) -> Result<String, ContentLoopError> {
338 self.client
339 .reply_to_tweet(content, in_reply_to)
340 .await
341 .map(|posted| posted.id)
342 .map_err(xapi_to_content_error)
343 }
344}
345
346pub struct LlmReplyAdapter {
352 generator: Arc<ContentGenerator>,
353}
354
355impl LlmReplyAdapter {
356 pub fn new(generator: Arc<ContentGenerator>) -> Self {
357 Self { generator }
358 }
359}
360
361#[async_trait::async_trait]
362impl ReplyGenerator for LlmReplyAdapter {
363 async fn generate_reply(
364 &self,
365 tweet_text: &str,
366 author: &str,
367 mention_product: bool,
368 ) -> Result<String, LoopError> {
369 self.generator
370 .generate_reply(tweet_text, author, mention_product)
371 .await
372 .map_err(llm_to_loop_error)
373 }
374}
375
376pub struct LlmTweetAdapter {
378 generator: Arc<ContentGenerator>,
379}
380
381impl LlmTweetAdapter {
382 pub fn new(generator: Arc<ContentGenerator>) -> Self {
383 Self { generator }
384 }
385}
386
387#[async_trait::async_trait]
388impl TweetGenerator for LlmTweetAdapter {
389 async fn generate_tweet(&self, topic: &str) -> Result<String, ContentLoopError> {
390 self.generator
391 .generate_tweet(topic)
392 .await
393 .map_err(llm_to_content_error)
394 }
395}
396
397pub struct LlmThreadAdapter {
399 generator: Arc<ContentGenerator>,
400}
401
402impl LlmThreadAdapter {
403 pub fn new(generator: Arc<ContentGenerator>) -> Self {
404 Self { generator }
405 }
406}
407
408#[async_trait::async_trait]
409impl ThreadGenerator for LlmThreadAdapter {
410 async fn generate_thread(
411 &self,
412 topic: &str,
413 _count: Option<usize>,
414 ) -> Result<Vec<String>, ContentLoopError> {
415 self.generator
416 .generate_thread(topic)
417 .await
418 .map_err(llm_to_content_error)
419 }
420}
421
422pub struct ScoringAdapter {
428 engine: Arc<ScoringEngine>,
429}
430
431impl ScoringAdapter {
432 pub fn new(engine: Arc<ScoringEngine>) -> Self {
433 Self { engine }
434 }
435}
436
437impl TweetScorer for ScoringAdapter {
438 fn score(&self, tweet: &LoopTweet) -> ScoreResult {
439 let data = TweetData {
440 text: tweet.text.clone(),
441 created_at: tweet.created_at.clone(),
442 likes: tweet.likes,
443 retweets: tweet.retweets,
444 replies: tweet.replies,
445 author_username: tweet.author_username.clone(),
446 author_followers: tweet.author_followers,
447 has_media: false,
448 is_quote_tweet: false,
449 };
450
451 let score = self.engine.score_tweet(&data);
452 let matched_keywords = scoring::find_matched_keywords(&tweet.text, self.engine.keywords());
453
454 ScoreResult {
455 total: score.total,
456 meets_threshold: score.meets_threshold,
457 matched_keywords,
458 }
459 }
460}
461
462pub struct SafetyAdapter {
468 guard: Arc<SafetyGuard>,
469 pool: DbPool,
470}
471
472impl SafetyAdapter {
473 pub fn new(guard: Arc<SafetyGuard>, pool: DbPool) -> Self {
474 Self { guard, pool }
475 }
476}
477
478#[async_trait::async_trait]
479impl SafetyChecker for SafetyAdapter {
480 async fn can_reply(&self) -> bool {
481 match self.guard.can_reply_to("__check__", None).await {
482 Ok(Ok(())) => true,
483 Ok(Err(reason)) => {
484 tracing::debug!(reason = %reason, "Safety check denied reply");
485 false
486 }
487 Err(e) => {
488 tracing::warn!(error = %e, "Safety check error, denying reply");
489 false
490 }
491 }
492 }
493
494 async fn has_replied_to(&self, tweet_id: &str) -> bool {
495 match self.guard.dedup_checker().has_replied_to(tweet_id).await {
496 Ok(replied) => replied,
497 Err(e) => {
498 tracing::warn!(error = %e, "Dedup check error, assuming already replied");
499 true
500 }
501 }
502 }
503
504 async fn record_reply(&self, tweet_id: &str, reply_content: &str) -> Result<(), LoopError> {
505 let reply = storage::replies::ReplySent {
507 id: 0,
508 target_tweet_id: tweet_id.to_string(),
509 reply_tweet_id: None,
510 reply_content: reply_content.to_string(),
511 llm_provider: None,
512 llm_model: None,
513 created_at: Utc::now().to_rfc3339(),
514 status: "pending".to_string(),
515 error_message: None,
516 };
517 storage::replies::insert_reply(&self.pool, &reply)
518 .await
519 .map_err(storage_to_loop_error)?;
520
521 self.guard
523 .record_reply()
524 .await
525 .map_err(storage_to_loop_error)?;
526
527 Ok(())
528 }
529}
530
531pub struct ContentSafetyAdapter {
533 guard: Arc<SafetyGuard>,
534}
535
536impl ContentSafetyAdapter {
537 pub fn new(guard: Arc<SafetyGuard>) -> Self {
538 Self { guard }
539 }
540}
541
542#[async_trait::async_trait]
543impl ContentSafety for ContentSafetyAdapter {
544 async fn can_post_tweet(&self) -> bool {
545 match self.guard.can_post_tweet().await {
546 Ok(Ok(())) => true,
547 Ok(Err(reason)) => {
548 tracing::debug!(reason = %reason, "Safety check denied tweet");
549 false
550 }
551 Err(e) => {
552 tracing::warn!(error = %e, "Safety check error, denying tweet");
553 false
554 }
555 }
556 }
557
558 async fn can_post_thread(&self) -> bool {
559 match self.guard.can_post_thread().await {
560 Ok(Ok(())) => true,
561 Ok(Err(reason)) => {
562 tracing::debug!(reason = %reason, "Safety check denied thread");
563 false
564 }
565 Err(e) => {
566 tracing::warn!(error = %e, "Safety check error, denying thread");
567 false
568 }
569 }
570 }
571}
572
573pub struct StorageAdapter {
582 pool: DbPool,
583}
584
585impl StorageAdapter {
586 pub fn new(pool: DbPool) -> Self {
587 Self { pool }
588 }
589}
590
591#[async_trait::async_trait]
592impl LoopStorage for StorageAdapter {
593 async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError> {
594 storage::cursors::get_cursor(&self.pool, key)
595 .await
596 .map_err(storage_to_loop_error)
597 }
598
599 async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError> {
600 storage::cursors::set_cursor(&self.pool, key, value)
601 .await
602 .map_err(storage_to_loop_error)
603 }
604
605 async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
606 storage::tweets::tweet_exists(&self.pool, tweet_id)
607 .await
608 .map_err(storage_to_loop_error)
609 }
610
611 async fn store_discovered_tweet(
612 &self,
613 tweet: &LoopTweet,
614 score: f32,
615 keyword: &str,
616 ) -> Result<(), LoopError> {
617 let discovered = storage::tweets::DiscoveredTweet {
618 id: tweet.id.clone(),
619 author_id: tweet.author_id.clone(),
620 author_username: tweet.author_username.clone(),
621 content: tweet.text.clone(),
622 like_count: tweet.likes as i64,
623 retweet_count: tweet.retweets as i64,
624 reply_count: tweet.replies as i64,
625 impression_count: None,
626 relevance_score: Some(score as f64),
627 matched_keyword: Some(keyword.to_string()),
628 discovered_at: Utc::now().to_rfc3339(),
629 replied_to: 0,
630 };
631 storage::tweets::insert_discovered_tweet(&self.pool, &discovered)
632 .await
633 .map_err(storage_to_loop_error)
634 }
635
636 async fn log_action(
637 &self,
638 action_type: &str,
639 status: &str,
640 message: &str,
641 ) -> Result<(), LoopError> {
642 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
643 .await
644 .map_err(storage_to_loop_error)
645 }
646}
647
648pub struct ContentStorageAdapter {
650 pool: DbPool,
651 post_tx: mpsc::Sender<PostAction>,
652}
653
654impl ContentStorageAdapter {
655 pub fn new(pool: DbPool, post_tx: mpsc::Sender<PostAction>) -> Self {
656 Self { pool, post_tx }
657 }
658}
659
660#[async_trait::async_trait]
661impl ContentStorage for ContentStorageAdapter {
662 async fn last_tweet_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
663 let time_str = storage::threads::get_last_original_tweet_time(&self.pool)
664 .await
665 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
666 Ok(time_str.and_then(|s| parse_datetime(&s)))
667 }
668
669 async fn todays_tweet_times(&self) -> Result<Vec<DateTime<Utc>>, ContentLoopError> {
670 let time_strs = storage::threads::get_todays_tweet_times(&self.pool)
671 .await
672 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
673 Ok(time_strs.iter().filter_map(|s| parse_datetime(s)).collect())
674 }
675
676 async fn last_thread_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
677 let time_str = storage::threads::get_last_thread_time(&self.pool)
678 .await
679 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
680 Ok(time_str.and_then(|s| parse_datetime(&s)))
681 }
682
683 async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError> {
684 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
686 self.post_tx
687 .send(PostAction::Tweet {
688 content: content.to_string(),
689 result_tx: Some(result_tx),
690 })
691 .await
692 .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?;
693
694 let tweet_id = result_rx
695 .await
696 .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?
697 .map_err(ContentLoopError::PostFailed)?;
698
699 let original = storage::threads::OriginalTweet {
701 id: 0,
702 tweet_id: Some(tweet_id),
703 content: content.to_string(),
704 topic: Some(topic.to_string()),
705 llm_provider: None,
706 created_at: Utc::now().to_rfc3339(),
707 status: "sent".to_string(),
708 error_message: None,
709 };
710 storage::threads::insert_original_tweet(&self.pool, &original)
711 .await
712 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
713
714 storage::rate_limits::increment_rate_limit(&self.pool, "tweet")
716 .await
717 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
718
719 Ok(())
720 }
721
722 async fn create_thread(
723 &self,
724 topic: &str,
725 tweet_count: usize,
726 ) -> Result<String, ContentLoopError> {
727 let thread = storage::threads::Thread {
728 id: 0,
729 topic: topic.to_string(),
730 tweet_count: tweet_count as i64,
731 root_tweet_id: None,
732 created_at: Utc::now().to_rfc3339(),
733 status: "pending".to_string(),
734 };
735 let id = storage::threads::insert_thread(&self.pool, &thread)
736 .await
737 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
738 Ok(id.to_string())
739 }
740
741 async fn update_thread_status(
742 &self,
743 thread_id: &str,
744 status: &str,
745 tweet_count: usize,
746 root_tweet_id: Option<&str>,
747 ) -> Result<(), ContentLoopError> {
748 let id: i64 = thread_id
749 .parse()
750 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
751
752 sqlx::query(
753 "UPDATE threads SET status = ?1, tweet_count = ?2, root_tweet_id = ?3 WHERE id = ?4",
754 )
755 .bind(status)
756 .bind(tweet_count as i64)
757 .bind(root_tweet_id)
758 .bind(id)
759 .execute(&self.pool)
760 .await
761 .map_err(sqlx_to_content_error)?;
762
763 if status == "sent" {
765 storage::rate_limits::increment_rate_limit(&self.pool, "thread")
766 .await
767 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
768 }
769
770 Ok(())
771 }
772
773 async fn store_thread_tweet(
774 &self,
775 thread_id: &str,
776 position: usize,
777 tweet_id: &str,
778 content: &str,
779 ) -> Result<(), ContentLoopError> {
780 let tid: i64 = thread_id
781 .parse()
782 .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
783
784 sqlx::query(
785 "INSERT INTO thread_tweets (thread_id, position, tweet_id, content, created_at)
786 VALUES (?1, ?2, ?3, ?4, datetime('now'))",
787 )
788 .bind(tid)
789 .bind(position as i64)
790 .bind(tweet_id)
791 .bind(content)
792 .execute(&self.pool)
793 .await
794 .map_err(sqlx_to_content_error)?;
795
796 Ok(())
797 }
798
799 async fn log_action(
800 &self,
801 action_type: &str,
802 status: &str,
803 message: &str,
804 ) -> Result<(), ContentLoopError> {
805 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
806 .await
807 .map_err(|e| ContentLoopError::StorageError(e.to_string()))
808 }
809}
810
811pub struct TargetStorageAdapter {
813 pool: DbPool,
814}
815
816impl TargetStorageAdapter {
817 pub fn new(pool: DbPool) -> Self {
818 Self { pool }
819 }
820}
821
822#[async_trait::async_trait]
823impl TargetStorage for TargetStorageAdapter {
824 async fn upsert_target_account(
825 &self,
826 account_id: &str,
827 username: &str,
828 ) -> Result<(), LoopError> {
829 storage::target_accounts::upsert_target_account(&self.pool, account_id, username)
830 .await
831 .map_err(storage_to_loop_error)
832 }
833
834 async fn get_followed_at(&self, account_id: &str) -> Result<Option<String>, LoopError> {
835 let account = storage::target_accounts::get_target_account(&self.pool, account_id)
836 .await
837 .map_err(storage_to_loop_error)?;
838 Ok(account.and_then(|a| a.followed_at))
839 }
840
841 async fn record_follow(&self, account_id: &str) -> Result<(), LoopError> {
842 storage::target_accounts::record_follow(&self.pool, account_id)
843 .await
844 .map_err(storage_to_loop_error)
845 }
846
847 async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
848 storage::target_accounts::target_tweet_exists(&self.pool, tweet_id)
849 .await
850 .map_err(storage_to_loop_error)
851 }
852
853 async fn store_target_tweet(
854 &self,
855 tweet_id: &str,
856 account_id: &str,
857 content: &str,
858 created_at: &str,
859 reply_count: i64,
860 like_count: i64,
861 relevance_score: f64,
862 ) -> Result<(), LoopError> {
863 storage::target_accounts::store_target_tweet(
864 &self.pool,
865 tweet_id,
866 account_id,
867 content,
868 created_at,
869 reply_count,
870 like_count,
871 relevance_score,
872 )
873 .await
874 .map_err(storage_to_loop_error)
875 }
876
877 async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError> {
878 storage::target_accounts::mark_target_tweet_replied(&self.pool, tweet_id)
879 .await
880 .map_err(storage_to_loop_error)
881 }
882
883 async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError> {
884 storage::target_accounts::record_target_reply(&self.pool, account_id)
885 .await
886 .map_err(storage_to_loop_error)
887 }
888
889 async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
890 storage::target_accounts::count_target_replies_today(&self.pool)
891 .await
892 .map_err(storage_to_loop_error)
893 }
894
895 async fn log_action(
896 &self,
897 action_type: &str,
898 status: &str,
899 message: &str,
900 ) -> Result<(), LoopError> {
901 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
902 .await
903 .map_err(storage_to_loop_error)
904 }
905}
906
907pub struct AnalyticsStorageAdapter {
909 pool: DbPool,
910}
911
912impl AnalyticsStorageAdapter {
913 pub fn new(pool: DbPool) -> Self {
914 Self { pool }
915 }
916}
917
918#[async_trait::async_trait]
919impl AnalyticsStorage for AnalyticsStorageAdapter {
920 async fn store_follower_snapshot(
921 &self,
922 followers: i64,
923 following: i64,
924 tweets: i64,
925 ) -> Result<(), AnalyticsError> {
926 storage::analytics::upsert_follower_snapshot(&self.pool, followers, following, tweets)
927 .await
928 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
929 }
930
931 async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
932 let row: Option<(i64,)> = sqlx::query_as(
933 "SELECT follower_count FROM follower_snapshots
934 WHERE snapshot_date < date('now')
935 ORDER BY snapshot_date DESC LIMIT 1",
936 )
937 .fetch_optional(&self.pool)
938 .await
939 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
940 Ok(row.map(|(c,)| c))
941 }
942
943 async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
944 let rows: Vec<(String,)> = sqlx::query_as(
945 "SELECT rs.reply_tweet_id FROM replies_sent rs
946 WHERE rs.status = 'sent'
947 AND rs.reply_tweet_id IS NOT NULL
948 AND rs.created_at >= datetime('now', '-25 hours')
949 AND rs.created_at <= datetime('now', '-23 hours')
950 AND NOT EXISTS (
951 SELECT 1 FROM reply_performance rp WHERE rp.reply_id = rs.reply_tweet_id
952 )",
953 )
954 .fetch_all(&self.pool)
955 .await
956 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
957 Ok(rows.into_iter().map(|(id,)| id).collect())
958 }
959
960 async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
961 let rows: Vec<(String,)> = sqlx::query_as(
962 "SELECT ot.tweet_id FROM original_tweets ot
963 WHERE ot.status = 'sent'
964 AND ot.tweet_id IS NOT NULL
965 AND ot.created_at >= datetime('now', '-25 hours')
966 AND ot.created_at <= datetime('now', '-23 hours')
967 AND NOT EXISTS (
968 SELECT 1 FROM tweet_performance tp WHERE tp.tweet_id = ot.tweet_id
969 )",
970 )
971 .fetch_all(&self.pool)
972 .await
973 .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
974 Ok(rows.into_iter().map(|(id,)| id).collect())
975 }
976
977 async fn store_reply_performance(
978 &self,
979 reply_id: &str,
980 likes: i64,
981 replies: i64,
982 impressions: i64,
983 score: f64,
984 ) -> Result<(), AnalyticsError> {
985 storage::analytics::upsert_reply_performance(
986 &self.pool,
987 reply_id,
988 likes,
989 replies,
990 impressions,
991 score,
992 )
993 .await
994 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
995 }
996
997 async fn store_tweet_performance(
998 &self,
999 tweet_id: &str,
1000 likes: i64,
1001 retweets: i64,
1002 replies: i64,
1003 impressions: i64,
1004 score: f64,
1005 ) -> Result<(), AnalyticsError> {
1006 storage::analytics::upsert_tweet_performance(
1007 &self.pool,
1008 tweet_id,
1009 likes,
1010 retweets,
1011 replies,
1012 impressions,
1013 score,
1014 )
1015 .await
1016 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
1017 }
1018
1019 async fn update_content_score(
1020 &self,
1021 topic: &str,
1022 format: &str,
1023 score: f64,
1024 ) -> Result<(), AnalyticsError> {
1025 storage::analytics::update_content_score(&self.pool, topic, format, score)
1026 .await
1027 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
1028 }
1029
1030 async fn log_action(
1031 &self,
1032 action_type: &str,
1033 status: &str,
1034 message: &str,
1035 ) -> Result<(), AnalyticsError> {
1036 storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
1037 .await
1038 .map_err(|e| AnalyticsError::StorageError(e.to_string()))
1039 }
1040}
1041
1042pub struct TopicScorerAdapter {
1044 pool: DbPool,
1045}
1046
1047impl TopicScorerAdapter {
1048 pub fn new(pool: DbPool) -> Self {
1049 Self { pool }
1050 }
1051}
1052
1053#[async_trait::async_trait]
1054impl TopicScorer for TopicScorerAdapter {
1055 async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError> {
1056 let scores = storage::analytics::get_top_topics(&self.pool, limit)
1057 .await
1058 .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
1059 Ok(scores.into_iter().map(|cs| cs.topic).collect())
1060 }
1061}
1062
1063pub struct PostSenderAdapter {
1069 tx: mpsc::Sender<PostAction>,
1070}
1071
1072impl PostSenderAdapter {
1073 pub fn new(tx: mpsc::Sender<PostAction>) -> Self {
1074 Self { tx }
1075 }
1076}
1077
1078#[async_trait::async_trait]
1079impl PostSender for PostSenderAdapter {
1080 async fn send_reply(&self, tweet_id: &str, content: &str) -> Result<(), LoopError> {
1081 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
1082 self.tx
1083 .send(PostAction::Reply {
1084 tweet_id: tweet_id.to_string(),
1085 content: content.to_string(),
1086 result_tx: Some(result_tx),
1087 })
1088 .await
1089 .map_err(|e| LoopError::Other(format!("posting queue send failed: {e}")))?;
1090
1091 result_rx
1092 .await
1093 .map_err(|e| LoopError::Other(format!("posting queue result recv failed: {e}")))?
1094 .map_err(|e| LoopError::Other(format!("post action failed: {e}")))?;
1095
1096 Ok(())
1097 }
1098}
1099
1100pub struct ApprovalQueueAdapter {
1102 pool: DbPool,
1103}
1104
1105impl ApprovalQueueAdapter {
1106 pub fn new(pool: DbPool) -> Self {
1107 Self { pool }
1108 }
1109}
1110
1111#[async_trait::async_trait]
1112impl ApprovalQueue for ApprovalQueueAdapter {
1113 async fn queue_reply(&self, tweet_id: &str, content: &str) -> Result<i64, String> {
1114 storage::approval_queue::enqueue(
1115 &self.pool, "reply", tweet_id, "", content, "", "", 0.0, )
1120 .await
1121 .map_err(|e| e.to_string())
1122 }
1123
1124 async fn queue_tweet(&self, content: &str) -> Result<i64, String> {
1125 storage::approval_queue::enqueue(
1126 &self.pool, "tweet", "", "", content, "", "", 0.0, )
1132 .await
1133 .map_err(|e| e.to_string())
1134 }
1135}
1136
1137pub struct StatusQuerierAdapter {
1143 pool: DbPool,
1144}
1145
1146impl StatusQuerierAdapter {
1147 pub fn new(pool: DbPool) -> Self {
1148 Self { pool }
1149 }
1150}
1151
1152#[async_trait::async_trait]
1153impl StatusQuerier for StatusQuerierAdapter {
1154 async fn query_action_counts_since(
1155 &self,
1156 since: DateTime<Utc>,
1157 ) -> Result<ActionCounts, String> {
1158 let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
1159 let counts = storage::action_log::get_action_counts_since(&self.pool, &since_str)
1160 .await
1161 .map_err(|e| e.to_string())?;
1162
1163 Ok(ActionCounts {
1164 tweets_scored: *counts.get("tweet_scored").unwrap_or(&0) as u64,
1165 replies_sent: *counts.get("reply_sent").unwrap_or(&0) as u64,
1166 tweets_posted: *counts.get("tweet_posted").unwrap_or(&0) as u64,
1167 threads_posted: *counts.get("thread_posted").unwrap_or(&0) as u64,
1168 })
1169 }
1170}