Skip to main content

tuitbot_core/automation/adapters/
storage.rs

1//! Storage adapter implementations.
2
3use chrono::{DateTime, Utc};
4use tokio::sync::mpsc;
5
6use super::super::analytics_loop::{AnalyticsError, AnalyticsStorage};
7use super::super::loop_helpers::{
8    ContentLoopError, ContentStorage, LoopError, LoopStorage, LoopTweet, TopicScorer,
9};
10use super::super::posting_queue::PostAction;
11use super::super::target_loop::TargetStorage;
12use super::helpers::{parse_datetime, sqlx_to_content_error, storage_to_loop_error};
13use crate::storage::{self, DbPool};
14
15/// Adapts `DbPool` to the `LoopStorage` port trait.
16///
17/// Provides cursor persistence (via the `cursors` table), tweet dedup,
18/// discovered tweet recording, and action logging.
19pub struct StorageAdapter {
20    pool: DbPool,
21}
22
23impl StorageAdapter {
24    pub fn new(pool: DbPool) -> Self {
25        Self { pool }
26    }
27}
28
29#[async_trait::async_trait]
30impl LoopStorage for StorageAdapter {
31    async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError> {
32        storage::cursors::get_cursor(&self.pool, key)
33            .await
34            .map_err(storage_to_loop_error)
35    }
36
37    async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError> {
38        storage::cursors::set_cursor(&self.pool, key, value)
39            .await
40            .map_err(storage_to_loop_error)
41    }
42
43    async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
44        storage::tweets::tweet_exists(&self.pool, tweet_id)
45            .await
46            .map_err(storage_to_loop_error)
47    }
48
49    async fn store_discovered_tweet(
50        &self,
51        tweet: &LoopTweet,
52        score: f32,
53        keyword: &str,
54    ) -> Result<(), LoopError> {
55        let discovered = storage::tweets::DiscoveredTweet {
56            id: tweet.id.clone(),
57            author_id: tweet.author_id.clone(),
58            author_username: tweet.author_username.clone(),
59            content: tweet.text.clone(),
60            like_count: tweet.likes as i64,
61            retweet_count: tweet.retweets as i64,
62            reply_count: tweet.replies as i64,
63            impression_count: None,
64            relevance_score: Some(score as f64),
65            matched_keyword: Some(keyword.to_string()),
66            discovered_at: Utc::now().to_rfc3339(),
67            replied_to: 0,
68        };
69        storage::tweets::insert_discovered_tweet(&self.pool, &discovered)
70            .await
71            .map_err(storage_to_loop_error)
72    }
73
74    async fn log_action(
75        &self,
76        action_type: &str,
77        status: &str,
78        message: &str,
79    ) -> Result<(), LoopError> {
80        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
81            .await
82            .map_err(storage_to_loop_error)
83    }
84}
85
86/// Adapts `DbPool` + posting queue to the `ContentStorage` port trait.
87pub struct ContentStorageAdapter {
88    pool: DbPool,
89    post_tx: mpsc::Sender<PostAction>,
90}
91
92impl ContentStorageAdapter {
93    pub fn new(pool: DbPool, post_tx: mpsc::Sender<PostAction>) -> Self {
94        Self { pool, post_tx }
95    }
96}
97
98#[async_trait::async_trait]
99impl ContentStorage for ContentStorageAdapter {
100    async fn last_tweet_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
101        let time_str = storage::threads::get_last_original_tweet_time(&self.pool)
102            .await
103            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
104        Ok(time_str.and_then(|s| parse_datetime(&s)))
105    }
106
107    async fn todays_tweet_times(&self) -> Result<Vec<DateTime<Utc>>, ContentLoopError> {
108        let time_strs = storage::threads::get_todays_tweet_times(&self.pool)
109            .await
110            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
111        Ok(time_strs.iter().filter_map(|s| parse_datetime(s)).collect())
112    }
113
114    async fn last_thread_time(&self) -> Result<Option<DateTime<Utc>>, ContentLoopError> {
115        let time_str = storage::threads::get_last_thread_time(&self.pool)
116            .await
117            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
118        Ok(time_str.and_then(|s| parse_datetime(&s)))
119    }
120
121    async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError> {
122        // Send to the posting queue and await result.
123        let (result_tx, result_rx) = tokio::sync::oneshot::channel();
124        self.post_tx
125            .send(PostAction::Tweet {
126                content: content.to_string(),
127                media_ids: vec![],
128                result_tx: Some(result_tx),
129            })
130            .await
131            .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?;
132
133        let tweet_id = result_rx
134            .await
135            .map_err(|e| ContentLoopError::PostFailed(e.to_string()))?
136            .map_err(ContentLoopError::PostFailed)?;
137
138        // Record in the database.
139        let original = storage::threads::OriginalTweet {
140            id: 0,
141            tweet_id: Some(tweet_id),
142            content: content.to_string(),
143            topic: Some(topic.to_string()),
144            llm_provider: None,
145            created_at: Utc::now().to_rfc3339(),
146            status: "sent".to_string(),
147            error_message: None,
148        };
149        storage::threads::insert_original_tweet(&self.pool, &original)
150            .await
151            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
152
153        // Increment rate limit.
154        storage::rate_limits::increment_rate_limit(&self.pool, "tweet")
155            .await
156            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
157
158        Ok(())
159    }
160
161    async fn create_thread(
162        &self,
163        topic: &str,
164        tweet_count: usize,
165    ) -> Result<String, ContentLoopError> {
166        let thread = storage::threads::Thread {
167            id: 0,
168            topic: topic.to_string(),
169            tweet_count: tweet_count as i64,
170            root_tweet_id: None,
171            created_at: Utc::now().to_rfc3339(),
172            status: "pending".to_string(),
173        };
174        let id = storage::threads::insert_thread(&self.pool, &thread)
175            .await
176            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
177        Ok(id.to_string())
178    }
179
180    async fn update_thread_status(
181        &self,
182        thread_id: &str,
183        status: &str,
184        tweet_count: usize,
185        root_tweet_id: Option<&str>,
186    ) -> Result<(), ContentLoopError> {
187        let id: i64 = thread_id
188            .parse()
189            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
190
191        sqlx::query(
192            "UPDATE threads SET status = ?1, tweet_count = ?2, root_tweet_id = ?3 WHERE id = ?4",
193        )
194        .bind(status)
195        .bind(tweet_count as i64)
196        .bind(root_tweet_id)
197        .bind(id)
198        .execute(&self.pool)
199        .await
200        .map_err(sqlx_to_content_error)?;
201
202        // If the thread was fully posted, increment the rate limit.
203        if status == "sent" {
204            storage::rate_limits::increment_rate_limit(&self.pool, "thread")
205                .await
206                .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
207        }
208
209        Ok(())
210    }
211
212    async fn store_thread_tweet(
213        &self,
214        thread_id: &str,
215        position: usize,
216        tweet_id: &str,
217        content: &str,
218    ) -> Result<(), ContentLoopError> {
219        let tid: i64 = thread_id
220            .parse()
221            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
222
223        sqlx::query(
224            "INSERT INTO thread_tweets (thread_id, position, tweet_id, content, created_at)
225             VALUES (?1, ?2, ?3, ?4, datetime('now'))",
226        )
227        .bind(tid)
228        .bind(position as i64)
229        .bind(tweet_id)
230        .bind(content)
231        .execute(&self.pool)
232        .await
233        .map_err(sqlx_to_content_error)?;
234
235        Ok(())
236    }
237
238    async fn log_action(
239        &self,
240        action_type: &str,
241        status: &str,
242        message: &str,
243    ) -> Result<(), ContentLoopError> {
244        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
245            .await
246            .map_err(|e| ContentLoopError::StorageError(e.to_string()))
247    }
248
249    async fn next_scheduled_item(&self) -> Result<Option<(i64, String, String)>, ContentLoopError> {
250        let items = storage::scheduled_content::get_due_items(&self.pool)
251            .await
252            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
253
254        Ok(items
255            .into_iter()
256            .next()
257            .map(|item| (item.id, item.content_type, item.content)))
258    }
259
260    async fn mark_scheduled_posted(
261        &self,
262        id: i64,
263        tweet_id: Option<&str>,
264    ) -> Result<(), ContentLoopError> {
265        storage::scheduled_content::update_status(&self.pool, id, "posted", tweet_id)
266            .await
267            .map_err(|e| ContentLoopError::StorageError(e.to_string()))
268    }
269
270    async fn mark_failed_permanent(
271        &self,
272        thread_id: &str,
273        error: &str,
274    ) -> Result<(), ContentLoopError> {
275        let id: i64 = thread_id
276            .parse()
277            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
278
279        // Update thread status to failed
280        sqlx::query(
281            "UPDATE threads SET status = ?1, failure_kind = ?2, last_error = ?3, failed_at = datetime('now') WHERE id = ?4",
282        )
283        .bind("failed")
284        .bind("permanent")
285        .bind(error)
286        .bind(id)
287        .execute(&self.pool)
288        .await
289        .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
290
291        // Fetch thread details for approval queue entry
292        // Concatenate all thread tweets into a single content string for the approval queue
293        let row: (String, u32) =
294            sqlx::query_as("SELECT topic, retry_count FROM threads WHERE id = ?1")
295                .bind(id)
296                .fetch_one(&self.pool)
297                .await
298                .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
299
300        let (topic, retry_count) = row;
301
302        // Fetch all tweets in the thread and concatenate them
303        let tweets: Vec<(String,)> = sqlx::query_as(
304            "SELECT content FROM thread_tweets WHERE thread_id = ?1 ORDER BY position",
305        )
306        .bind(id)
307        .fetch_all(&self.pool)
308        .await
309        .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
310
311        let content = if tweets.is_empty() {
312            format!("Failed thread id={}", id)
313        } else {
314            tweets
315                .iter()
316                .map(|t| t.0.as_str())
317                .collect::<Vec<_>>()
318                .join("\n---\n")
319        };
320
321        // Build metadata JSON for the approval queue entry
322        let metadata = format!(
323            "Failed thread id={}, retries={}, error: {}",
324            id, retry_count, error
325        );
326
327        // Insert into approval_queue with status="pending" for human review
328        sqlx::query(
329            "INSERT INTO approval_queue (action_type, generated_content, topic, status, reason) VALUES (?1, ?2, ?3, ?4, ?5)",
330        )
331        .bind("failed_post_recovery")
332        .bind(content)
333        .bind(topic)
334        .bind("pending")
335        .bind(metadata)
336        .execute(&self.pool)
337        .await
338        .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
339
340        Ok(())
341    }
342
343    async fn increment_retry(&self, thread_id: &str, error: &str) -> Result<u32, ContentLoopError> {
344        let id: i64 = thread_id
345            .parse()
346            .map_err(|_| ContentLoopError::StorageError("invalid thread_id".to_string()))?;
347
348        // Increment retry_count and update failure metadata
349        sqlx::query(
350            "UPDATE threads SET retry_count = retry_count + 1, failure_kind = ?1, last_error = ?2, failed_at = datetime('now') WHERE id = ?3",
351        )
352        .bind("transient")
353        .bind(error)
354        .bind(id)
355        .execute(&self.pool)
356        .await
357        .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
358
359        // Fetch updated retry_count
360        let row: (i64,) = sqlx::query_as("SELECT retry_count FROM threads WHERE id = ?1")
361            .bind(id)
362            .fetch_one(&self.pool)
363            .await
364            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
365
366        Ok(row.0 as u32)
367    }
368}
369
370/// Adapts `DbPool` to the `TargetStorage` port trait.
371pub struct TargetStorageAdapter {
372    pool: DbPool,
373}
374
375impl TargetStorageAdapter {
376    pub fn new(pool: DbPool) -> Self {
377        Self { pool }
378    }
379}
380
381#[async_trait::async_trait]
382impl TargetStorage for TargetStorageAdapter {
383    async fn upsert_target_account(
384        &self,
385        account_id: &str,
386        username: &str,
387    ) -> Result<(), LoopError> {
388        storage::target_accounts::upsert_target_account(&self.pool, account_id, username)
389            .await
390            .map_err(storage_to_loop_error)
391    }
392
393    async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
394        storage::target_accounts::target_tweet_exists(&self.pool, tweet_id)
395            .await
396            .map_err(storage_to_loop_error)
397    }
398
399    async fn store_target_tweet(
400        &self,
401        tweet_id: &str,
402        account_id: &str,
403        content: &str,
404        created_at: &str,
405        reply_count: i64,
406        like_count: i64,
407        relevance_score: f64,
408    ) -> Result<(), LoopError> {
409        storage::target_accounts::store_target_tweet(
410            &self.pool,
411            tweet_id,
412            account_id,
413            content,
414            created_at,
415            reply_count,
416            like_count,
417            relevance_score,
418        )
419        .await
420        .map_err(storage_to_loop_error)
421    }
422
423    async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError> {
424        storage::target_accounts::mark_target_tweet_replied(&self.pool, tweet_id)
425            .await
426            .map_err(storage_to_loop_error)
427    }
428
429    async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError> {
430        storage::target_accounts::record_target_reply(&self.pool, account_id)
431            .await
432            .map_err(storage_to_loop_error)
433    }
434
435    async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
436        storage::target_accounts::count_target_replies_today(&self.pool)
437            .await
438            .map_err(storage_to_loop_error)
439    }
440
441    async fn log_action(
442        &self,
443        action_type: &str,
444        status: &str,
445        message: &str,
446    ) -> Result<(), LoopError> {
447        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
448            .await
449            .map_err(storage_to_loop_error)
450    }
451}
452
453/// Adapts `DbPool` to the `AnalyticsStorage` port trait.
454pub struct AnalyticsStorageAdapter {
455    pool: DbPool,
456}
457
458impl AnalyticsStorageAdapter {
459    pub fn new(pool: DbPool) -> Self {
460        Self { pool }
461    }
462}
463
464#[async_trait::async_trait]
465impl AnalyticsStorage for AnalyticsStorageAdapter {
466    async fn store_follower_snapshot(
467        &self,
468        followers: i64,
469        following: i64,
470        tweets: i64,
471    ) -> Result<(), AnalyticsError> {
472        storage::analytics::upsert_follower_snapshot(&self.pool, followers, following, tweets)
473            .await
474            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
475    }
476
477    async fn get_yesterday_followers(&self) -> Result<Option<i64>, AnalyticsError> {
478        let row: Option<(i64,)> = sqlx::query_as(
479            "SELECT follower_count FROM follower_snapshots
480             WHERE snapshot_date < date('now')
481             ORDER BY snapshot_date DESC LIMIT 1",
482        )
483        .fetch_optional(&self.pool)
484        .await
485        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
486        Ok(row.map(|(c,)| c))
487    }
488
489    async fn get_replies_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
490        let rows: Vec<(String,)> = sqlx::query_as(
491            "SELECT rs.reply_tweet_id FROM replies_sent rs
492             WHERE rs.status = 'sent'
493               AND rs.reply_tweet_id IS NOT NULL
494               AND rs.created_at >= datetime('now', '-25 hours')
495               AND rs.created_at <= datetime('now', '-23 hours')
496               AND NOT EXISTS (
497                   SELECT 1 FROM reply_performance rp WHERE rp.reply_id = rs.reply_tweet_id
498               )",
499        )
500        .fetch_all(&self.pool)
501        .await
502        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
503        Ok(rows.into_iter().map(|(id,)| id).collect())
504    }
505
506    async fn get_tweets_needing_measurement(&self) -> Result<Vec<String>, AnalyticsError> {
507        let rows: Vec<(String,)> = sqlx::query_as(
508            "SELECT ot.tweet_id FROM original_tweets ot
509             WHERE ot.status = 'sent'
510               AND ot.tweet_id IS NOT NULL
511               AND ot.created_at >= datetime('now', '-25 hours')
512               AND ot.created_at <= datetime('now', '-23 hours')
513               AND NOT EXISTS (
514                   SELECT 1 FROM tweet_performance tp WHERE tp.tweet_id = ot.tweet_id
515               )",
516        )
517        .fetch_all(&self.pool)
518        .await
519        .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
520        Ok(rows.into_iter().map(|(id,)| id).collect())
521    }
522
523    async fn store_reply_performance(
524        &self,
525        reply_id: &str,
526        likes: i64,
527        replies: i64,
528        impressions: i64,
529        score: f64,
530    ) -> Result<(), AnalyticsError> {
531        storage::analytics::upsert_reply_performance(
532            &self.pool,
533            reply_id,
534            likes,
535            replies,
536            impressions,
537            score,
538        )
539        .await
540        .map_err(|e| AnalyticsError::StorageError(e.to_string()))
541    }
542
543    async fn store_tweet_performance(
544        &self,
545        tweet_id: &str,
546        likes: i64,
547        retweets: i64,
548        replies: i64,
549        impressions: i64,
550        score: f64,
551    ) -> Result<(), AnalyticsError> {
552        storage::analytics::upsert_tweet_performance(
553            &self.pool,
554            tweet_id,
555            likes,
556            retweets,
557            replies,
558            impressions,
559            score,
560        )
561        .await
562        .map_err(|e| AnalyticsError::StorageError(e.to_string()))
563    }
564
565    async fn update_content_score(
566        &self,
567        topic: &str,
568        format: &str,
569        score: f64,
570    ) -> Result<(), AnalyticsError> {
571        storage::analytics::update_content_score(&self.pool, topic, format, score)
572            .await
573            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
574    }
575
576    async fn log_action(
577        &self,
578        action_type: &str,
579        status: &str,
580        message: &str,
581    ) -> Result<(), AnalyticsError> {
582        storage::action_log::log_action(&self.pool, action_type, status, Some(message), None)
583            .await
584            .map_err(|e| AnalyticsError::StorageError(e.to_string()))
585    }
586
587    async fn run_aggregations(&self) -> Result<(), AnalyticsError> {
588        let account_id = storage::accounts::DEFAULT_ACCOUNT_ID;
589        storage::analytics::aggregate_best_times_for(&self.pool, account_id)
590            .await
591            .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
592        storage::analytics::aggregate_reach_for(&self.pool, account_id)
593            .await
594            .map_err(|e| AnalyticsError::StorageError(e.to_string()))?;
595        Ok(())
596    }
597}
598
599/// Adapts `DbPool` to the `TopicScorer` port trait.
600pub struct TopicScorerAdapter {
601    pool: DbPool,
602}
603
604impl TopicScorerAdapter {
605    pub fn new(pool: DbPool) -> Self {
606        Self { pool }
607    }
608}
609
610#[async_trait::async_trait]
611impl TopicScorer for TopicScorerAdapter {
612    async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError> {
613        let scores = storage::analytics::get_top_topics(&self.pool, limit)
614            .await
615            .map_err(|e| ContentLoopError::StorageError(e.to_string()))?;
616        Ok(scores.into_iter().map(|cs| cs.topic).collect())
617    }
618}