Skip to main content

tuitbot_core/automation/
adapters.rs

1//! Adapter implementations bridging port traits to real dependencies.
2//!
3//! Each adapter struct wraps one or more concrete dependencies (X API client,
4//! content generator, scoring engine, safety guard, database pool, posting queue)
5//! and implements the port traits defined in [`loop_helpers`], [`analytics_loop`],
6//! [`target_loop`], [`thread_loop`], [`posting_queue`], and [`status_reporter`].
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use chrono::{DateTime, NaiveDateTime, Utc};
12use tokio::sync::mpsc;
13
14use crate::content::ContentGenerator;
15use crate::error::{LlmError, XApiError};
16use crate::safety::SafetyGuard;
17use crate::scoring::{self, ScoringEngine, TweetData};
18use crate::storage::{self, DbPool};
19use crate::x_api::{SearchResponse, XApiClient, XApiHttpClient};
20
21use super::analytics_loop::{AnalyticsError, AnalyticsStorage, EngagementFetcher, ProfileFetcher};
22use super::loop_helpers::{
23    ContentLoopError, ContentSafety, ContentStorage, LoopError, LoopStorage, LoopTweet,
24    MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker, ScoreResult, ThreadPoster,
25    TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
26};
27use super::posting_queue::{ApprovalQueue, PostAction, PostExecutor};
28use super::status_reporter::{ActionCounts, StatusQuerier};
29use super::target_loop::{TargetStorage, TargetTweetFetcher, TargetUserManager};
30use super::thread_loop::ThreadGenerator;
31
32// ============================================================================
33// Helper functions
34// ============================================================================
35
36/// Convert an X API `SearchResponse` to a `Vec<LoopTweet>`.
37///
38/// Joins tweet data with user data from the `includes` expansion to populate
39/// author username and follower count.
40fn search_response_to_loop_tweets(response: SearchResponse) -> Vec<LoopTweet> {
41    let users: HashMap<&str, _> = response
42        .includes
43        .as_ref()
44        .map(|inc| inc.users.iter().map(|u| (u.id.as_str(), u)).collect())
45        .unwrap_or_default();
46
47    response
48        .data
49        .into_iter()
50        .map(|tweet| {
51            let user = users.get(tweet.author_id.as_str());
52            LoopTweet {
53                id: tweet.id,
54                text: tweet.text,
55                author_id: tweet.author_id,
56                author_username: user.map(|u| u.username.clone()).unwrap_or_default(),
57                author_followers: user.map(|u| u.public_metrics.followers_count).unwrap_or(0),
58                created_at: tweet.created_at,
59                likes: tweet.public_metrics.like_count,
60                retweets: tweet.public_metrics.retweet_count,
61                replies: tweet.public_metrics.reply_count,
62            }
63        })
64        .collect()
65}
66
67/// Map `XApiError` to `LoopError`.
68fn xapi_to_loop_error(e: XApiError) -> LoopError {
69    match e {
70        XApiError::RateLimited { retry_after } => LoopError::RateLimited { retry_after },
71        XApiError::AuthExpired => LoopError::AuthExpired,
72        XApiError::Network { source } => LoopError::NetworkError(source.to_string()),
73        other => LoopError::Other(other.to_string()),
74    }
75}
76
77/// Map `XApiError` to `ContentLoopError`.
78fn xapi_to_content_error(e: XApiError) -> ContentLoopError {
79    match e {
80        XApiError::RateLimited { retry_after } => ContentLoopError::PostFailed(format!(
81            "rate limited{}",
82            retry_after
83                .map(|s| format!(", retry after {s}s"))
84                .unwrap_or_default()
85        )),
86        XApiError::Network { source } => ContentLoopError::NetworkError(source.to_string()),
87        other => ContentLoopError::PostFailed(other.to_string()),
88    }
89}
90
91/// Map `XApiError` to `AnalyticsError`.
92fn xapi_to_analytics_error(e: XApiError) -> AnalyticsError {
93    AnalyticsError::ApiError(e.to_string())
94}
95
96/// Map `LlmError` to `LoopError`.
97fn llm_to_loop_error(e: LlmError) -> LoopError {
98    LoopError::LlmFailure(e.to_string())
99}
100
101/// Map `LlmError` to `ContentLoopError`.
102fn llm_to_content_error(e: LlmError) -> ContentLoopError {
103    ContentLoopError::LlmFailure(e.to_string())
104}
105
106/// Map `sqlx::Error` to `ContentLoopError`.
107fn sqlx_to_content_error(e: sqlx::Error) -> ContentLoopError {
108    ContentLoopError::StorageError(e.to_string())
109}
110
111/// Map `StorageError` to `LoopError`.
112fn storage_to_loop_error(e: crate::error::StorageError) -> LoopError {
113    LoopError::StorageError(e.to_string())
114}
115
116/// Parse a datetime string into `DateTime<Utc>`.
117///
118/// Tries RFC-3339 first, then `%Y-%m-%d %H:%M:%S` (SQLite `datetime()` format),
119/// then `%Y-%m-%dT%H:%M:%SZ`.
120fn parse_datetime(s: &str) -> Option<DateTime<Utc>> {
121    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
122        return Some(dt.with_timezone(&Utc));
123    }
124    if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
125        return Some(naive.and_utc());
126    }
127    if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%SZ") {
128        return Some(naive.and_utc());
129    }
130    None
131}
132
133// ============================================================================
134// X API adapters
135// ============================================================================
136
137/// Adapts `XApiHttpClient` to the `TweetSearcher` port trait.
138pub struct XApiSearchAdapter {
139    client: Arc<XApiHttpClient>,
140}
141
142impl XApiSearchAdapter {
143    pub fn new(client: Arc<XApiHttpClient>) -> Self {
144        Self { client }
145    }
146}
147
148#[async_trait::async_trait]
149impl TweetSearcher for XApiSearchAdapter {
150    async fn search_tweets(&self, query: &str) -> Result<Vec<LoopTweet>, LoopError> {
151        let response = self
152            .client
153            .search_tweets(query, 20, None)
154            .await
155            .map_err(xapi_to_loop_error)?;
156        Ok(search_response_to_loop_tweets(response))
157    }
158}
159
160/// Adapts `XApiHttpClient` to the `MentionsFetcher` port trait.
161pub struct XApiMentionsAdapter {
162    client: Arc<XApiHttpClient>,
163    own_user_id: String,
164}
165
166impl XApiMentionsAdapter {
167    pub fn new(client: Arc<XApiHttpClient>, own_user_id: String) -> Self {
168        Self {
169            client,
170            own_user_id,
171        }
172    }
173}
174
175#[async_trait::async_trait]
176impl MentionsFetcher for XApiMentionsAdapter {
177    async fn get_mentions(&self, since_id: Option<&str>) -> Result<Vec<LoopTweet>, LoopError> {
178        let response = self
179            .client
180            .get_mentions(&self.own_user_id, since_id)
181            .await
182            .map_err(xapi_to_loop_error)?;
183        Ok(search_response_to_loop_tweets(response))
184    }
185}
186
187/// Adapts `XApiHttpClient` to `TargetTweetFetcher` and `TargetUserManager`.
188pub struct XApiTargetAdapter {
189    client: Arc<XApiHttpClient>,
190}
191
192impl XApiTargetAdapter {
193    pub fn new(client: Arc<XApiHttpClient>) -> Self {
194        Self { client }
195    }
196}
197
198#[async_trait::async_trait]
199impl TargetTweetFetcher for XApiTargetAdapter {
200    async fn fetch_user_tweets(&self, user_id: &str) -> Result<Vec<LoopTweet>, LoopError> {
201        let response = self
202            .client
203            .get_user_tweets(user_id, 10)
204            .await
205            .map_err(xapi_to_loop_error)?;
206        Ok(search_response_to_loop_tweets(response))
207    }
208}
209
210#[async_trait::async_trait]
211impl TargetUserManager for XApiTargetAdapter {
212    async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError> {
213        let user = self
214            .client
215            .get_user_by_username(username)
216            .await
217            .map_err(xapi_to_loop_error)?;
218        Ok((user.id, user.username))
219    }
220
221    async fn follow_user(
222        &self,
223        source_user_id: &str,
224        target_user_id: &str,
225    ) -> Result<(), LoopError> {
226        self.client
227            .follow_user(source_user_id, target_user_id)
228            .await
229            .map_err(xapi_to_loop_error)
230    }
231}
232
233/// Adapts `XApiHttpClient` to `ProfileFetcher` and `EngagementFetcher`.
234pub struct XApiProfileAdapter {
235    client: Arc<XApiHttpClient>,
236}
237
238impl XApiProfileAdapter {
239    pub fn new(client: Arc<XApiHttpClient>) -> Self {
240        Self { client }
241    }
242}
243
244#[async_trait::async_trait]
245impl ProfileFetcher for XApiProfileAdapter {
246    async fn get_profile_metrics(
247        &self,
248    ) -> Result<super::analytics_loop::ProfileMetrics, AnalyticsError> {
249        let user = self
250            .client
251            .get_me()
252            .await
253            .map_err(xapi_to_analytics_error)?;
254        Ok(super::analytics_loop::ProfileMetrics {
255            follower_count: user.public_metrics.followers_count as i64,
256            following_count: user.public_metrics.following_count as i64,
257            tweet_count: user.public_metrics.tweet_count as i64,
258        })
259    }
260}
261
262#[async_trait::async_trait]
263impl EngagementFetcher for XApiProfileAdapter {
264    async fn get_tweet_metrics(
265        &self,
266        tweet_id: &str,
267    ) -> Result<super::analytics_loop::TweetMetrics, AnalyticsError> {
268        let tweet = self
269            .client
270            .get_tweet(tweet_id)
271            .await
272            .map_err(xapi_to_analytics_error)?;
273        Ok(super::analytics_loop::TweetMetrics {
274            likes: tweet.public_metrics.like_count as i64,
275            retweets: tweet.public_metrics.retweet_count as i64,
276            replies: tweet.public_metrics.reply_count as i64,
277            impressions: tweet.public_metrics.impression_count as i64,
278        })
279    }
280}
281
282/// Adapts `XApiHttpClient` to `PostExecutor` (for the posting queue).
283pub struct XApiPostExecutorAdapter {
284    client: Arc<XApiHttpClient>,
285}
286
287impl XApiPostExecutorAdapter {
288    pub fn new(client: Arc<XApiHttpClient>) -> Self {
289        Self { client }
290    }
291}
292
293#[async_trait::async_trait]
294impl PostExecutor for XApiPostExecutorAdapter {
295    async fn execute_reply(&self, tweet_id: &str, content: &str) -> Result<String, String> {
296        self.client
297            .reply_to_tweet(content, tweet_id)
298            .await
299            .map(|posted| posted.id)
300            .map_err(|e| e.to_string())
301    }
302
303    async fn execute_tweet(&self, content: &str) -> Result<String, String> {
304        self.client
305            .post_tweet(content)
306            .await
307            .map(|posted| posted.id)
308            .map_err(|e| e.to_string())
309    }
310}
311
312/// Adapts `XApiHttpClient` to `ThreadPoster` (for direct thread posting).
313pub struct XApiThreadPosterAdapter {
314    client: Arc<XApiHttpClient>,
315}
316
317impl XApiThreadPosterAdapter {
318    pub fn new(client: Arc<XApiHttpClient>) -> Self {
319        Self { client }
320    }
321}
322
323#[async_trait::async_trait]
324impl ThreadPoster for XApiThreadPosterAdapter {
325    async fn post_tweet(&self, content: &str) -> Result<String, ContentLoopError> {
326        self.client
327            .post_tweet(content)
328            .await
329            .map(|posted| posted.id)
330            .map_err(xapi_to_content_error)
331    }
332
333    async fn reply_to_tweet(
334        &self,
335        in_reply_to: &str,
336        content: &str,
337    ) -> Result<String, ContentLoopError> {
338        self.client
339            .reply_to_tweet(content, in_reply_to)
340            .await
341            .map(|posted| posted.id)
342            .map_err(xapi_to_content_error)
343    }
344}
345
346// ============================================================================
347// LLM adapters
348// ============================================================================
349
350/// Adapts `ContentGenerator` to the `ReplyGenerator` port trait.
351pub struct LlmReplyAdapter {
352    generator: Arc<ContentGenerator>,
353}
354
355impl LlmReplyAdapter {
356    pub fn new(generator: Arc<ContentGenerator>) -> Self {
357        Self { generator }
358    }
359}
360
361#[async_trait::async_trait]
362impl ReplyGenerator for LlmReplyAdapter {
363    async fn generate_reply(
364        &self,
365        tweet_text: &str,
366        author: &str,
367        mention_product: bool,
368    ) -> Result<String, LoopError> {
369        self.generator
370            .generate_reply(tweet_text, author, mention_product)
371            .await
372            .map_err(llm_to_loop_error)
373    }
374}
375
376/// Adapts `ContentGenerator` to the `TweetGenerator` port trait.
377pub struct LlmTweetAdapter {
378    generator: Arc<ContentGenerator>,
379}
380
381impl LlmTweetAdapter {
382    pub fn new(generator: Arc<ContentGenerator>) -> Self {
383        Self { generator }
384    }
385}
386
387#[async_trait::async_trait]
388impl TweetGenerator for LlmTweetAdapter {
389    async fn generate_tweet(&self, topic: &str) -> Result<String, ContentLoopError> {
390        self.generator
391            .generate_tweet(topic)
392            .await
393            .map_err(llm_to_content_error)
394    }
395}
396
397/// Adapts `ContentGenerator` to the `ThreadGenerator` port trait.
398pub struct LlmThreadAdapter {
399    generator: Arc<ContentGenerator>,
400}
401
402impl LlmThreadAdapter {
403    pub fn new(generator: Arc<ContentGenerator>) -> Self {
404        Self { generator }
405    }
406}
407
408#[async_trait::async_trait]
409impl ThreadGenerator for LlmThreadAdapter {
410    async fn generate_thread(
411        &self,
412        topic: &str,
413        _count: Option<usize>,
414    ) -> Result<Vec<String>, ContentLoopError> {
415        self.generator
416            .generate_thread(topic)
417            .await
418            .map_err(llm_to_content_error)
419    }
420}
421
422// ============================================================================
423// Scoring adapter
424// ============================================================================
425
426/// Adapts `ScoringEngine` to the `TweetScorer` port trait.
427pub struct ScoringAdapter {
428    engine: Arc<ScoringEngine>,
429}
430
431impl ScoringAdapter {
432    pub fn new(engine: Arc<ScoringEngine>) -> Self {
433        Self { engine }
434    }
435}
436
437impl TweetScorer for ScoringAdapter {
438    fn score(&self, tweet: &LoopTweet) -> ScoreResult {
439        let data = TweetData {
440            text: tweet.text.clone(),
441            created_at: tweet.created_at.clone(),
442            likes: tweet.likes,
443            retweets: tweet.retweets,
444            replies: tweet.replies,
445            author_username: tweet.author_username.clone(),
446            author_followers: tweet.author_followers,
447            has_media: false,
448            is_quote_tweet: false,
449        };
450
451        let score = self.engine.score_tweet(&data);
452        let matched_keywords = scoring::find_matched_keywords(&tweet.text, self.engine.keywords());
453
454        ScoreResult {
455            total: score.total,
456            meets_threshold: score.meets_threshold,
457            matched_keywords,
458        }
459    }
460}
461
462// ============================================================================
463// Safety adapters
464// ============================================================================
465
466/// Adapts `SafetyGuard` to the `SafetyChecker` port trait.
467pub struct SafetyAdapter {
468    guard: Arc<SafetyGuard>,
469    pool: DbPool,
470}
471
472impl SafetyAdapter {
473    pub fn new(guard: Arc<SafetyGuard>, pool: DbPool) -> Self {
474        Self { guard, pool }
475    }
476}
477
478#[async_trait::async_trait]
479impl SafetyChecker for SafetyAdapter {
480    async fn can_reply(&self) -> bool {
481        match self.guard.can_reply_to("__check__", None).await {
482            Ok(Ok(())) => true,
483            Ok(Err(reason)) => {
484                tracing::debug!(reason = %reason, "Safety check denied reply");
485                false
486            }
487            Err(e) => {
488                tracing::warn!(error = %e, "Safety check error, denying reply");
489                false
490            }
491        }
492    }
493
494    async fn has_replied_to(&self, tweet_id: &str) -> bool {
495        match self.guard.dedup_checker().has_replied_to(tweet_id).await {
496            Ok(replied) => replied,
497            Err(e) => {
498                tracing::warn!(error = %e, "Dedup check error, assuming already replied");
499                true
500            }
501        }
502    }
503
504    async fn record_reply(&self, tweet_id: &str, reply_content: &str) -> Result<(), LoopError> {
505        // Insert a reply record for dedup tracking.
506        let reply = storage::replies::ReplySent {
507            id: 0,
508            target_tweet_id: tweet_id.to_string(),
509            reply_tweet_id: None,
510            reply_content: reply_content.to_string(),
511            llm_provider: None,
512            llm_model: None,
513            created_at: Utc::now().to_rfc3339(),
514            status: "pending".to_string(),
515            error_message: None,
516        };
517        storage::replies::insert_reply(&self.pool, &reply)
518            .await
519            .map_err(storage_to_loop_error)?;
520
521        // Increment rate limit counter.
522        self.guard
523            .record_reply()
524            .await
525            .map_err(storage_to_loop_error)?;
526
527        Ok(())
528    }
529}
530
531/// Adapts `SafetyGuard` to the `ContentSafety` port trait.
532pub struct ContentSafetyAdapter {
533    guard: Arc<SafetyGuard>,
534}
535
536impl ContentSafetyAdapter {
537    pub fn new(guard: Arc<SafetyGuard>) -> Self {
538        Self { guard }
539    }
540}
541
542#[async_trait::async_trait]
543impl ContentSafety for ContentSafetyAdapter {
544    async fn can_post_tweet(&self) -> bool {
545        match self.guard.can_post_tweet().await {
546            Ok(Ok(())) => true,
547            Ok(Err(reason)) => {
548                tracing::debug!(reason = %reason, "Safety check denied tweet");
549                false
550            }
551            Err(e) => {
552                tracing::warn!(error = %e, "Safety check error, denying tweet");
553                false
554            }
555        }
556    }
557
558    async fn can_post_thread(&self) -> bool {
559        match self.guard.can_post_thread().await {
560            Ok(Ok(())) => true,
561            Ok(Err(reason)) => {
562                tracing::debug!(reason = %reason, "Safety check denied thread");
563                false
564            }
565            Err(e) => {
566                tracing::warn!(error = %e, "Safety check error, denying thread");
567                false
568            }
569        }
570    }
571}
572
573// ============================================================================
574// Storage adapters
575// ============================================================================
576
577/// Adapts `DbPool` to the `LoopStorage` port trait.
578///
579/// Provides cursor persistence (via the `cursors` table), tweet dedup,
580/// discovered tweet recording, and action logging.
581pub struct StorageAdapter {
582    pool: DbPool,
583}
584
585impl StorageAdapter {
586    pub fn new(pool: DbPool) -> Self {
587        Self { pool }
588    }
589}
590
591#[async_trait::async_trait]
592impl LoopStorage for StorageAdapter {
593    async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError> {
594        storage::cursors::get_cursor(&self.pool, key)
595            .await
596            .map_err(storage_to_loop_error)
597    }
598
599    async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError> {
600        storage::cursors::set_cursor(&self.pool, key, value)
601            .await
602            .map_err(storage_to_loop_error)
603    }
604
605    async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
606        storage::tweets::tweet_exists(&self.pool, tweet_id)
607            .await
608            .map_err(storage_to_loop_error)
609    }
610
611    async fn store_discovered_tweet(
612        &self,
613        tweet: &LoopTweet,
614        score: f32,
615        keyword: &str,
616    ) -> Result<(), LoopError> {
617        let discovered = storage::tweets::DiscoveredTweet {
618            id: tweet.id.clone(),
619            author_id: tweet.author_id.clone(),
620            author_username: tweet.author_username.clone(),
621            content: tweet.text.clone(),
622            like_count: tweet.likes as i64,
623            retweet_count: tweet.retweets as i64,
624            reply_count: tweet.replies as i64,
625            impression_count: None,
626            relevance_score: Some(score as f64),
627            matched_keyword: Some(keyword.to_string()),
628            discovered_at: Utc::now().to_rfc3339(),
629            replied_to: 0,
630        };
631        storage::tweets::insert_discovered_tweet(&self.pool, &discovered)
632            .await
633            .map_err(storage_to_loop_error)
634    }
635
636    async fn log_action(
637        &self,
638        action_type: &str,
639        status: &str,
640        message: &str,
641    ) -> Result<(), LoopError> {
642        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
643            .await
644            .map_err(storage_to_loop_error)
645    }
646}
647
648/// Adapts `DbPool` + posting queue to the `ContentStorage` port trait.
649pub struct ContentStorageAdapter {
650    pool: DbPool,
651    post_tx: mpsc::Sender<PostAction>,
652}
653
654impl ContentStorageAdapter {
655    pub fn new(pool: DbPool, post_tx: mpsc::Sender<PostAction>) -> Self {
656        Self { pool, post_tx }
657    }
658}
659
660#[async_trait::async_trait]
661impl ContentStorage for ContentStorageAdapter {
662    async fn last_tweet_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
663        let time_str = storage::threads::get_last_original_tweet_time(&self.pool)
664            .await
665            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
666        Ok(time_str.and_then(|s| parse_datetime(&s)))
667    }
668
669    async fn todays_tweet_times(&self) -> Result<Vec<DateTime<Utc>>, ContentLoopError> {
670        let time_strs = storage::threads::get_todays_tweet_times(&self.pool)
671            .await
672            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
673        Ok(time_strs.iter().filter_map(|s| parse_datetime(s)).collect())
674    }
675
676    async fn last_thread_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
677        let time_str = storage::threads::get_last_thread_time(&self.pool)
678            .await
679            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
680        Ok(time_str.and_then(|s| parse_datetime(&s)))
681    }
682
683    async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError> {
684        // Send to the posting queue and await result.
685        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
686        self.post_tx
687            .send(PostAction::Tweet {
688                content: content.to_string(),
689                result_tx: Some(result_tx),
690            })
691            .await
692            .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?;
693
694        let tweet_id = result_rx
695            .await
696            .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?
697            .map_err(ContentLoopError::PostFailed)?;
698
699        // Record in the database.
700        let original = storage::threads::OriginalTweet {
701            id: 0,
702            tweet_id: Some(tweet_id),
703            content: content.to_string(),
704            topic: Some(topic.to_string()),
705            llm_provider: None,
706            created_at: Utc::now().to_rfc3339(),
707            status: "sent".to_string(),
708            error_message: None,
709        };
710        storage::threads::insert_original_tweet(&self.pool, &original)
711            .await
712            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
713
714        // Increment rate limit.
715        storage::rate_limits::increment_rate_limit(&self.pool, "tweet")
716            .await
717            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
718
719        Ok(())
720    }
721
722    async fn create_thread(
723        &self,
724        topic: &str,
725        tweet_count: usize,
726    ) -> Result<String, ContentLoopError> {
727        let thread = storage::threads::Thread {
728            id: 0,
729            topic: topic.to_string(),
730            tweet_count: tweet_count as i64,
731            root_tweet_id: None,
732            created_at: Utc::now().to_rfc3339(),
733            status: "pending".to_string(),
734        };
735        let id = storage::threads::insert_thread(&self.pool, &thread)
736            .await
737            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
738        Ok(id.to_string())
739    }
740
741    async fn update_thread_status(
742        &self,
743        thread_id: &str,
744        status: &str,
745        tweet_count: usize,
746        root_tweet_id: Option<&str>,
747    ) -> Result<(), ContentLoopError> {
748        let id: i64 = thread_id
749            .parse()
750            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
751
752        sqlx::query(
753            "UPDATE threads SET status = ?1, tweet_count = ?2, root_tweet_id = ?3 WHERE id = ?4",
754        )
755        .bind(status)
756        .bind(tweet_count as i64)
757        .bind(root_tweet_id)
758        .bind(id)
759        .execute(&self.pool)
760        .await
761        .map_err(sqlx_to_content_error)?;
762
763        // If the thread was fully posted, increment the rate limit.
764        if status == "sent" {
765            storage::rate_limits::increment_rate_limit(&self.pool, "thread")
766                .await
767                .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
768        }
769
770        Ok(())
771    }
772
773    async fn store_thread_tweet(
774        &self,
775        thread_id: &str,
776        position: usize,
777        tweet_id: &str,
778        content: &str,
779    ) -> Result<(), ContentLoopError> {
780        let tid: i64 = thread_id
781            .parse()
782            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
783
784        sqlx::query(
785            "INSERT INTO thread_tweets (thread_id, position, tweet_id, content, created_at)
786             VALUES (?1, ?2, ?3, ?4, datetime('now'))",
787        )
788        .bind(tid)
789        .bind(position as i64)
790        .bind(tweet_id)
791        .bind(content)
792        .execute(&self.pool)
793        .await
794        .map_err(sqlx_to_content_error)?;
795
796        Ok(())
797    }
798
799    async fn log_action(
800        &self,
801        action_type: &str,
802        status: &str,
803        message: &str,
804    ) -> Result<(), ContentLoopError> {
805        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
806            .await
807            .map_err(|e| ContentLoopError::StorageError(e.to_string()))
808    }
809}
810
811/// Adapts `DbPool` to the `TargetStorage` port trait.
812pub struct TargetStorageAdapter {
813    pool: DbPool,
814}
815
816impl TargetStorageAdapter {
817    pub fn new(pool: DbPool) -> Self {
818        Self { pool }
819    }
820}
821
822#[async_trait::async_trait]
823impl TargetStorage for TargetStorageAdapter {
824    async fn upsert_target_account(
825        &self,
826        account_id: &str,
827        username: &str,
828    ) -> Result<(), LoopError> {
829        storage::target_accounts::upsert_target_account(&self.pool, account_id, username)
830            .await
831            .map_err(storage_to_loop_error)
832    }
833
834    async fn get_followed_at(&self, account_id: &str) -> Result<Option<String>, LoopError> {
835        let account = storage::target_accounts::get_target_account(&self.pool, account_id)
836            .await
837            .map_err(storage_to_loop_error)?;
838        Ok(account.and_then(|a| a.followed_at))
839    }
840
841    async fn record_follow(&self, account_id: &str) -> Result<(), LoopError> {
842        storage::target_accounts::record_follow(&self.pool, account_id)
843            .await
844            .map_err(storage_to_loop_error)
845    }
846
847    async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
848        storage::target_accounts::target_tweet_exists(&self.pool, tweet_id)
849            .await
850            .map_err(storage_to_loop_error)
851    }
852
853    async fn store_target_tweet(
854        &self,
855        tweet_id: &str,
856        account_id: &str,
857        content: &str,
858        created_at: &str,
859        reply_count: i64,
860        like_count: i64,
861        relevance_score: f64,
862    ) -> Result<(), LoopError> {
863        storage::target_accounts::store_target_tweet(
864            &self.pool,
865            tweet_id,
866            account_id,
867            content,
868            created_at,
869            reply_count,
870            like_count,
871            relevance_score,
872        )
873        .await
874        .map_err(storage_to_loop_error)
875    }
876
877    async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError> {
878        storage::target_accounts::mark_target_tweet_replied(&self.pool, tweet_id)
879            .await
880            .map_err(storage_to_loop_error)
881    }
882
883    async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError> {
884        storage::target_accounts::record_target_reply(&self.pool, account_id)
885            .await
886            .map_err(storage_to_loop_error)
887    }
888
889    async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
890        storage::target_accounts::count_target_replies_today(&self.pool)
891            .await
892            .map_err(storage_to_loop_error)
893    }
894
895    async fn log_action(
896        &self,
897        action_type: &str,
898        status: &str,
899        message: &str,
900    ) -> Result<(), LoopError> {
901        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
902            .await
903            .map_err(storage_to_loop_error)
904    }
905}
906
907/// Adapts `DbPool` to the `AnalyticsStorage` port trait.
908pub struct AnalyticsStorageAdapter {
909    pool: DbPool,
910}
911
912impl AnalyticsStorageAdapter {
913    pub fn new(pool: DbPool) -> Self {
914        Self { pool }
915    }
916}
917
918#[async_trait::async_trait]
919impl AnalyticsStorage for AnalyticsStorageAdapter {
920    async fn store_follower_snapshot(
921        &self,
922        followers: i64,
923        following: i64,
924        tweets: i64,
925    ) -> Result<(), AnalyticsError> {
926        storage::analytics::upsert_follower_snapshot(&self.pool, followers, following, tweets)
927            .await
928            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
929    }
930
931    async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
932        let row: Option<(i64,)> = sqlx::query_as(
933            "SELECT follower_count FROM follower_snapshots
934             WHERE snapshot_date < date('now')
935             ORDER BY snapshot_date DESC LIMIT 1",
936        )
937        .fetch_optional(&self.pool)
938        .await
939        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
940        Ok(row.map(|(c,)| c))
941    }
942
943    async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
944        let rows: Vec<(String,)> = sqlx::query_as(
945            "SELECT rs.reply_tweet_id FROM replies_sent rs
946             WHERE rs.status = 'sent'
947               AND rs.reply_tweet_id IS NOT NULL
948               AND rs.created_at >= datetime('now', '-25 hours')
949               AND rs.created_at <= datetime('now', '-23 hours')
950               AND NOT EXISTS (
951                   SELECT 1 FROM reply_performance rp WHERE rp.reply_id = rs.reply_tweet_id
952               )",
953        )
954        .fetch_all(&self.pool)
955        .await
956        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
957        Ok(rows.into_iter().map(|(id,)| id).collect())
958    }
959
960    async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
961        let rows: Vec<(String,)> = sqlx::query_as(
962            "SELECT ot.tweet_id FROM original_tweets ot
963             WHERE ot.status = 'sent'
964               AND ot.tweet_id IS NOT NULL
965               AND ot.created_at >= datetime('now', '-25 hours')
966               AND ot.created_at <= datetime('now', '-23 hours')
967               AND NOT EXISTS (
968                   SELECT 1 FROM tweet_performance tp WHERE tp.tweet_id = ot.tweet_id
969               )",
970        )
971        .fetch_all(&self.pool)
972        .await
973        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
974        Ok(rows.into_iter().map(|(id,)| id).collect())
975    }
976
977    async fn store_reply_performance(
978        &self,
979        reply_id: &str,
980        likes: i64,
981        replies: i64,
982        impressions: i64,
983        score: f64,
984    ) -> Result<(), AnalyticsError> {
985        storage::analytics::upsert_reply_performance(
986            &self.pool,
987            reply_id,
988            likes,
989            replies,
990            impressions,
991            score,
992        )
993        .await
994        .map_err(|e| AnalyticsError::StorageError(e.to_string()))
995    }
996
997    async fn store_tweet_performance(
998        &self,
999        tweet_id: &str,
1000        likes: i64,
1001        retweets: i64,
1002        replies: i64,
1003        impressions: i64,
1004        score: f64,
1005    ) -> Result<(), AnalyticsError> {
1006        storage::analytics::upsert_tweet_performance(
1007            &self.pool,
1008            tweet_id,
1009            likes,
1010            retweets,
1011            replies,
1012            impressions,
1013            score,
1014        )
1015        .await
1016        .map_err(|e| AnalyticsError::StorageError(e.to_string()))
1017    }
1018
1019    async fn update_content_score(
1020        &self,
1021        topic: &str,
1022        format: &str,
1023        score: f64,
1024    ) -> Result<(), AnalyticsError> {
1025        storage::analytics::update_content_score(&self.pool, topic, format, score)
1026            .await
1027            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
1028    }
1029
1030    async fn log_action(
1031        &self,
1032        action_type: &str,
1033        status: &str,
1034        message: &str,
1035    ) -> Result<(), AnalyticsError> {
1036        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
1037            .await
1038            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
1039    }
1040}
1041
1042/// Adapts `DbPool` to the `TopicScorer` port trait.
1043pub struct TopicScorerAdapter {
1044    pool: DbPool,
1045}
1046
1047impl TopicScorerAdapter {
1048    pub fn new(pool: DbPool) -> Self {
1049        Self { pool }
1050    }
1051}
1052
1053#[async_trait::async_trait]
1054impl TopicScorer for TopicScorerAdapter {
1055    async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError> {
1056        let scores = storage::analytics::get_top_topics(&self.pool, limit)
1057            .await
1058            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
1059        Ok(scores.into_iter().map(|cs| cs.topic).collect())
1060    }
1061}
1062
1063// ============================================================================
1064// Posting queue adapters
1065// ============================================================================
1066
1067/// Adapts `mpsc::Sender<PostAction>` to the `PostSender` port trait.
1068pub struct PostSenderAdapter {
1069    tx: mpsc::Sender<PostAction>,
1070}
1071
1072impl PostSenderAdapter {
1073    pub fn new(tx: mpsc::Sender<PostAction>) -> Self {
1074        Self { tx }
1075    }
1076}
1077
1078#[async_trait::async_trait]
1079impl PostSender for PostSenderAdapter {
1080    async fn send_reply(&self, tweet_id: &str, content: &str) -> Result<(), LoopError> {
1081        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
1082        self.tx
1083            .send(PostAction::Reply {
1084                tweet_id: tweet_id.to_string(),
1085                content: content.to_string(),
1086                result_tx: Some(result_tx),
1087            })
1088            .await
1089            .map_err(|e| LoopError::Other(format!("posting queue send failed: {e}")))?;
1090
1091        result_rx
1092            .await
1093            .map_err(|e| LoopError::Other(format!("posting queue result recv failed: {e}")))?
1094            .map_err(|e| LoopError::Other(format!("post action failed: {e}")))?;
1095
1096        Ok(())
1097    }
1098}
1099
1100/// Adapts `DbPool` to the `ApprovalQueue` port trait.
1101pub struct ApprovalQueueAdapter {
1102    pool: DbPool,
1103}
1104
1105impl ApprovalQueueAdapter {
1106    pub fn new(pool: DbPool) -> Self {
1107        Self { pool }
1108    }
1109}
1110
1111#[async_trait::async_trait]
1112impl ApprovalQueue for ApprovalQueueAdapter {
1113    async fn queue_reply(&self, tweet_id: &str, content: &str) -> Result<i64, String> {
1114        storage::approval_queue::enqueue(
1115            &self.pool, "reply", tweet_id, "", // target_author not available here
1116            content, "",  // topic
1117            "",  // archetype
1118            0.0, // score
1119        )
1120        .await
1121        .map_err(|e| e.to_string())
1122    }
1123
1124    async fn queue_tweet(&self, content: &str) -> Result<i64, String> {
1125        storage::approval_queue::enqueue(
1126            &self.pool, "tweet", "", // no target tweet
1127            "", // no target author
1128            content, "",  // topic
1129            "",  // archetype
1130            0.0, // score
1131        )
1132        .await
1133        .map_err(|e| e.to_string())
1134    }
1135}
1136
1137// ============================================================================
1138// Status reporter adapter
1139// ============================================================================
1140
1141/// Adapts `DbPool` to the `StatusQuerier` port trait.
1142pub struct StatusQuerierAdapter {
1143    pool: DbPool,
1144}
1145
1146impl StatusQuerierAdapter {
1147    pub fn new(pool: DbPool) -> Self {
1148        Self { pool }
1149    }
1150}
1151
1152#[async_trait::async_trait]
1153impl StatusQuerier for StatusQuerierAdapter {
1154    async fn query_action_counts_since(
1155        &self,
1156        since: DateTime<Utc>,
1157    ) -> Result<ActionCounts, String> {
1158        let since_str = since.format("%Y-%m-%dT%H:%M:%SZ").to_string();
1159        let counts = storage::action_log::get_action_counts_since(&self.pool, &since_str)
1160            .await
1161            .map_err(|e| e.to_string())?;
1162
1163        Ok(ActionCounts {
1164            tweets_scored: *counts.get("tweet_scored").unwrap_or(&0) as u64,
1165            replies_sent: *counts.get("reply_sent").unwrap_or(&0) as u64,
1166            tweets_posted: *counts.get("tweet_posted").unwrap_or(&0) as u64,
1167            threads_posted: *counts.get("thread_posted").unwrap_or(&0) as u64,
1168        })
1169    }
1170}