Skip to main content

tuitbot_core/automation/adapters/
storage.rs

1//! Storage adapter implementations.
2
3use chrono::{DateTime, Utc};
4use tokio::sync::mpsc;
5
6use super::super::analytics_loop::{AnalyticsError, AnalyticsStorage};
7use super::super::loop_helpers::{
8    ContentLoopError, ContentStorage, LoopError, LoopStorage, LoopTweet, TopicScorer,
9};
10use super::super::posting_queue::PostAction;
11use super::super::target_loop::TargetStorage;
12use super::helpers::{parse_datetime, sqlx_to_content_error, storage_to_loop_error};
13use crate::storage::{self, DbPool};
14
15/// Adapts `DbPool` to the `LoopStorage` port trait.
16///
17/// Provides cursor persistence (via the `cursors` table), tweet dedup,
18/// discovered tweet recording, and action logging.
19pub struct StorageAdapter {
20    pool: DbPool,
21}
22
23impl StorageAdapter {
24    pub fn new(pool: DbPool) -> Self {
25        Self { pool }
26    }
27}
28
29#[async_trait::async_trait]
30impl LoopStorage for StorageAdapter {
31    async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError> {
32        storage::cursors::get_cursor(&self.pool, key)
33            .await
34            .map_err(storage_to_loop_error)
35    }
36
37    async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError> {
38        storage::cursors::set_cursor(&self.pool, key, value)
39            .await
40            .map_err(storage_to_loop_error)
41    }
42
43    async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
44        storage::tweets::tweet_exists(&self.pool, tweet_id)
45            .await
46            .map_err(storage_to_loop_error)
47    }
48
49    async fn store_discovered_tweet(
50        &self,
51        tweet: &LoopTweet,
52        score: f32,
53        keyword: &str,
54    ) -> Result<(), LoopError> {
55        let discovered = storage::tweets::DiscoveredTweet {
56            id: tweet.id.clone(),
57            author_id: tweet.author_id.clone(),
58            author_username: tweet.author_username.clone(),
59            content: tweet.text.clone(),
60            like_count: tweet.likes as i64,
61            retweet_count: tweet.retweets as i64,
62            reply_count: tweet.replies as i64,
63            impression_count: None,
64            relevance_score: Some(score as f64),
65            matched_keyword: Some(keyword.to_string()),
66            discovered_at: Utc::now().to_rfc3339(),
67            replied_to: 0,
68        };
69        storage::tweets::insert_discovered_tweet(&self.pool, &discovered)
70            .await
71            .map_err(storage_to_loop_error)
72    }
73
74    async fn log_action(
75        &self,
76        action_type: &str,
77        status: &str,
78        message: &str,
79    ) -> Result<(), LoopError> {
80        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
81            .await
82            .map_err(storage_to_loop_error)
83    }
84}
85
86/// Adapts `DbPool` + posting queue to the `ContentStorage` port trait.
87pub struct ContentStorageAdapter {
88    pool: DbPool,
89    post_tx: mpsc::Sender<PostAction>,
90}
91
92impl ContentStorageAdapter {
93    pub fn new(pool: DbPool, post_tx: mpsc::Sender<PostAction>) -> Self {
94        Self { pool, post_tx }
95    }
96}
97
98#[async_trait::async_trait]
99impl ContentStorage for ContentStorageAdapter {
100    async fn last_tweet_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
101        let time_str = storage::threads::get_last_original_tweet_time(&self.pool)
102            .await
103            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
104        Ok(time_str.and_then(|s| parse_datetime(&s)))
105    }
106
107    async fn todays_tweet_times(&self) -> Result<Vec<DateTime<Utc>>, ContentLoopError> {
108        let time_strs = storage::threads::get_todays_tweet_times(&self.pool)
109            .await
110            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
111        Ok(time_strs.iter().filter_map(|s| parse_datetime(s)).collect())
112    }
113
114    async fn last_thread_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
115        let time_str = storage::threads::get_last_thread_time(&self.pool)
116            .await
117            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
118        Ok(time_str.and_then(|s| parse_datetime(&s)))
119    }
120
121    async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError> {
122        // Send to the posting queue and await result.
123        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
124        self.post_tx
125            .send(PostAction::Tweet {
126                content: content.to_string(),
127                media_ids: vec![],
128                result_tx: Some(result_tx),
129            })
130            .await
131            .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?;
132
133        let tweet_id = result_rx
134            .await
135            .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?
136            .map_err(ContentLoopError::PostFailed)?;
137
138        // Record in the database.
139        let original = storage::threads::OriginalTweet {
140            id: 0,
141            tweet_id: Some(tweet_id),
142            content: content.to_string(),
143            topic: Some(topic.to_string()),
144            llm_provider: None,
145            created_at: Utc::now().to_rfc3339(),
146            status: "sent".to_string(),
147            error_message: None,
148        };
149        storage::threads::insert_original_tweet(&self.pool, &original)
150            .await
151            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
152
153        // Increment rate limit.
154        storage::rate_limits::increment_rate_limit(&self.pool, "tweet")
155            .await
156            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
157
158        Ok(())
159    }
160
161    async fn create_thread(
162        &self,
163        topic: &str,
164        tweet_count: usize,
165    ) -> Result<String, ContentLoopError> {
166        let thread = storage::threads::Thread {
167            id: 0,
168            topic: topic.to_string(),
169            tweet_count: tweet_count as i64,
170            root_tweet_id: None,
171            created_at: Utc::now().to_rfc3339(),
172            status: "pending".to_string(),
173        };
174        let id = storage::threads::insert_thread(&self.pool, &thread)
175            .await
176            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
177        Ok(id.to_string())
178    }
179
180    async fn update_thread_status(
181        &self,
182        thread_id: &str,
183        status: &str,
184        tweet_count: usize,
185        root_tweet_id: Option<&str>,
186    ) -> Result<(), ContentLoopError> {
187        let id: i64 = thread_id
188            .parse()
189            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
190
191        sqlx::query(
192            "UPDATE threads SET status = ?1, tweet_count = ?2, root_tweet_id = ?3 WHERE id = ?4",
193        )
194        .bind(status)
195        .bind(tweet_count as i64)
196        .bind(root_tweet_id)
197        .bind(id)
198        .execute(&self.pool)
199        .await
200        .map_err(sqlx_to_content_error)?;
201
202        // If the thread was fully posted, increment the rate limit.
203        if status == "sent" {
204            storage::rate_limits::increment_rate_limit(&self.pool, "thread")
205                .await
206                .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
207        }
208
209        Ok(())
210    }
211
212    async fn store_thread_tweet(
213        &self,
214        thread_id: &str,
215        position: usize,
216        tweet_id: &str,
217        content: &str,
218    ) -> Result<(), ContentLoopError> {
219        let tid: i64 = thread_id
220            .parse()
221            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
222
223        sqlx::query(
224            "INSERT INTO thread_tweets (thread_id, position, tweet_id, content, created_at)
225             VALUES (?1, ?2, ?3, ?4, datetime('now'))",
226        )
227        .bind(tid)
228        .bind(position as i64)
229        .bind(tweet_id)
230        .bind(content)
231        .execute(&self.pool)
232        .await
233        .map_err(sqlx_to_content_error)?;
234
235        Ok(())
236    }
237
238    async fn log_action(
239        &self,
240        action_type: &str,
241        status: &str,
242        message: &str,
243    ) -> Result<(), ContentLoopError> {
244        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
245            .await
246            .map_err(|e| ContentLoopError::StorageError(e.to_string()))
247    }
248
249    async fn next_scheduled_item(&self) -> Result<Option<(i64, String, String)>, ContentLoopError> {
250        let items = storage::scheduled_content::get_due_items(&self.pool)
251            .await
252            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
253
254        Ok(items
255            .into_iter()
256            .next()
257            .map(|item| (item.id, item.content_type, item.content)))
258    }
259
260    async fn mark_scheduled_posted(
261        &self,
262        id: i64,
263        tweet_id: Option<&str>,
264    ) -> Result<(), ContentLoopError> {
265        storage::scheduled_content::update_status(&self.pool, id, "posted", tweet_id)
266            .await
267            .map_err(|e| ContentLoopError::StorageError(e.to_string()))
268    }
269}
270
271/// Adapts `DbPool` to the `TargetStorage` port trait.
272pub struct TargetStorageAdapter {
273    pool: DbPool,
274}
275
276impl TargetStorageAdapter {
277    pub fn new(pool: DbPool) -> Self {
278        Self { pool }
279    }
280}
281
282#[async_trait::async_trait]
283impl TargetStorage for TargetStorageAdapter {
284    async fn upsert_target_account(
285        &self,
286        account_id: &str,
287        username: &str,
288    ) -> Result<(), LoopError> {
289        storage::target_accounts::upsert_target_account(&self.pool, account_id, username)
290            .await
291            .map_err(storage_to_loop_error)
292    }
293
294    async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
295        storage::target_accounts::target_tweet_exists(&self.pool, tweet_id)
296            .await
297            .map_err(storage_to_loop_error)
298    }
299
300    async fn store_target_tweet(
301        &self,
302        tweet_id: &str,
303        account_id: &str,
304        content: &str,
305        created_at: &str,
306        reply_count: i64,
307        like_count: i64,
308        relevance_score: f64,
309    ) -> Result<(), LoopError> {
310        storage::target_accounts::store_target_tweet(
311            &self.pool,
312            tweet_id,
313            account_id,
314            content,
315            created_at,
316            reply_count,
317            like_count,
318            relevance_score,
319        )
320        .await
321        .map_err(storage_to_loop_error)
322    }
323
324    async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError> {
325        storage::target_accounts::mark_target_tweet_replied(&self.pool, tweet_id)
326            .await
327            .map_err(storage_to_loop_error)
328    }
329
330    async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError> {
331        storage::target_accounts::record_target_reply(&self.pool, account_id)
332            .await
333            .map_err(storage_to_loop_error)
334    }
335
336    async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
337        storage::target_accounts::count_target_replies_today(&self.pool)
338            .await
339            .map_err(storage_to_loop_error)
340    }
341
342    async fn log_action(
343        &self,
344        action_type: &str,
345        status: &str,
346        message: &str,
347    ) -> Result<(), LoopError> {
348        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
349            .await
350            .map_err(storage_to_loop_error)
351    }
352}
353
354/// Adapts `DbPool` to the `AnalyticsStorage` port trait.
355pub struct AnalyticsStorageAdapter {
356    pool: DbPool,
357}
358
359impl AnalyticsStorageAdapter {
360    pub fn new(pool: DbPool) -> Self {
361        Self { pool }
362    }
363}
364
365#[async_trait::async_trait]
366impl AnalyticsStorage for AnalyticsStorageAdapter {
367    async fn store_follower_snapshot(
368        &self,
369        followers: i64,
370        following: i64,
371        tweets: i64,
372    ) -> Result<(), AnalyticsError> {
373        storage::analytics::upsert_follower_snapshot(&self.pool, followers, following, tweets)
374            .await
375            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
376    }
377
378    async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
379        let row: Option<(i64,)> = sqlx::query_as(
380            "SELECT follower_count FROM follower_snapshots
381             WHERE snapshot_date < date('now')
382             ORDER BY snapshot_date DESC LIMIT 1",
383        )
384        .fetch_optional(&self.pool)
385        .await
386        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
387        Ok(row.map(|(c,)| c))
388    }
389
390    async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
391        let rows: Vec<(String,)> = sqlx::query_as(
392            "SELECT rs.reply_tweet_id FROM replies_sent rs
393             WHERE rs.status = 'sent'
394               AND rs.reply_tweet_id IS NOT NULL
395               AND rs.created_at >= datetime('now', '-25 hours')
396               AND rs.created_at <= datetime('now', '-23 hours')
397               AND NOT EXISTS (
398                   SELECT 1 FROM reply_performance rp WHERE rp.reply_id = rs.reply_tweet_id
399               )",
400        )
401        .fetch_all(&self.pool)
402        .await
403        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
404        Ok(rows.into_iter().map(|(id,)| id).collect())
405    }
406
407    async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
408        let rows: Vec<(String,)> = sqlx::query_as(
409            "SELECT ot.tweet_id FROM original_tweets ot
410             WHERE ot.status = 'sent'
411               AND ot.tweet_id IS NOT NULL
412               AND ot.created_at >= datetime('now', '-25 hours')
413               AND ot.created_at <= datetime('now', '-23 hours')
414               AND NOT EXISTS (
415                   SELECT 1 FROM tweet_performance tp WHERE tp.tweet_id = ot.tweet_id
416               )",
417        )
418        .fetch_all(&self.pool)
419        .await
420        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
421        Ok(rows.into_iter().map(|(id,)| id).collect())
422    }
423
424    async fn store_reply_performance(
425        &self,
426        reply_id: &str,
427        likes: i64,
428        replies: i64,
429        impressions: i64,
430        score: f64,
431    ) -> Result<(), AnalyticsError> {
432        storage::analytics::upsert_reply_performance(
433            &self.pool,
434            reply_id,
435            likes,
436            replies,
437            impressions,
438            score,
439        )
440        .await
441        .map_err(|e| AnalyticsError::StorageError(e.to_string()))
442    }
443
444    async fn store_tweet_performance(
445        &self,
446        tweet_id: &str,
447        likes: i64,
448        retweets: i64,
449        replies: i64,
450        impressions: i64,
451        score: f64,
452    ) -> Result<(), AnalyticsError> {
453        storage::analytics::upsert_tweet_performance(
454            &self.pool,
455            tweet_id,
456            likes,
457            retweets,
458            replies,
459            impressions,
460            score,
461        )
462        .await
463        .map_err(|e| AnalyticsError::StorageError(e.to_string()))
464    }
465
466    async fn update_content_score(
467        &self,
468        topic: &str,
469        format: &str,
470        score: f64,
471    ) -> Result<(), AnalyticsError> {
472        storage::analytics::update_content_score(&self.pool, topic, format, score)
473            .await
474            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
475    }
476
477    async fn log_action(
478        &self,
479        action_type: &str,
480        status: &str,
481        message: &str,
482    ) -> Result<(), AnalyticsError> {
483        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
484            .await
485            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
486    }
487}
488
489/// Adapts `DbPool` to the `TopicScorer` port trait.
490pub struct TopicScorerAdapter {
491    pool: DbPool,
492}
493
494impl TopicScorerAdapter {
495    pub fn new(pool: DbPool) -> Self {
496        Self { pool }
497    }
498}
499
500#[async_trait::async_trait]
501impl TopicScorer for TopicScorerAdapter {
502    async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError> {
503        let scores = storage::analytics::get_top_topics(&self.pool, limit)
504            .await
505            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
506        Ok(scores.into_iter().map(|cs| cs.topic).collect())
507    }
508}