Skip to main content

tuitbot_core/automation/
loop_helpers.rs

1//! Shared types, traits, and helpers for automation loops.
2//!
3//! Defines port traits that decouple loop logic from concrete
4//! implementations (X API, LLM, storage, safety). When all work
5//! packages are merged, adapter implementations bridge these traits
6//! to the actual types.
7
8use std::fmt;
9use std::time::Duration;
10
11// ============================================================================
12// WP08 types: Mentions + Discovery loops
13// ============================================================================
14
15/// Tweet data as seen by automation loop logic.
16///
17/// A common representation used by both mentions and discovery loops,
18/// decoupled from any specific API response type.
19#[derive(Debug, Clone)]
20pub struct LoopTweet {
21    /// Unique tweet ID.
22    pub id: String,
23    /// Tweet text content.
24    pub text: String,
25    /// Author's user ID.
26    pub author_id: String,
27    /// Author's username (without @).
28    pub author_username: String,
29    /// Author's follower count.
30    pub author_followers: u64,
31    /// ISO-8601 creation timestamp.
32    pub created_at: String,
33    /// Number of likes.
34    pub likes: u64,
35    /// Number of retweets.
36    pub retweets: u64,
37    /// Number of replies.
38    pub replies: u64,
39}
40
41/// Result of scoring a tweet for reply-worthiness.
42#[derive(Debug, Clone)]
43pub struct ScoreResult {
44    /// Total score (0-100).
45    pub total: f32,
46    /// Whether the score meets the configured threshold.
47    pub meets_threshold: bool,
48    /// Keywords that matched in the tweet.
49    pub matched_keywords: Vec<String>,
50}
51
52/// Errors that can occur in mentions/discovery automation loops.
53///
54/// Wraps specific error categories to enable appropriate handling
55/// (e.g., back off on rate limit, skip on LLM failure, refresh on auth expiry).
56#[derive(Debug)]
57pub enum LoopError {
58    /// X API rate limit hit.
59    RateLimited {
60        /// Seconds to wait before retrying, if known.
61        retry_after: Option<u64>,
62    },
63    /// OAuth token expired and needs refresh.
64    AuthExpired,
65    /// LLM content generation failed.
66    LlmFailure(String),
67    /// Network-level error.
68    NetworkError(String),
69    /// Database/storage error.
70    StorageError(String),
71    /// Any other error.
72    Other(String),
73}
74
75impl fmt::Display for LoopError {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        match self {
78            LoopError::RateLimited { retry_after } => match retry_after {
79                Some(secs) => write!(f, "rate limited, retry after {secs}s"),
80                None => write!(f, "rate limited"),
81            },
82            LoopError::AuthExpired => write!(f, "authentication expired"),
83            LoopError::LlmFailure(msg) => write!(f, "LLM failure: {msg}"),
84            LoopError::NetworkError(msg) => write!(f, "network error: {msg}"),
85            LoopError::StorageError(msg) => write!(f, "storage error: {msg}"),
86            LoopError::Other(msg) => write!(f, "{msg}"),
87        }
88    }
89}
90
91// ============================================================================
92// WP09 types: Content + Thread loops
93// ============================================================================
94
95/// Errors that can occur in the content/thread automation loops.
96#[derive(Debug)]
97pub enum ContentLoopError {
98    /// LLM generation failed.
99    LlmFailure(String),
100    /// Posting to X failed.
101    PostFailed(String),
102    /// Storage/database error.
103    StorageError(String),
104    /// Network error.
105    NetworkError(String),
106    /// Other error.
107    Other(String),
108}
109
110impl fmt::Display for ContentLoopError {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        match self {
113            Self::LlmFailure(msg) => write!(f, "LLM failure: {msg}"),
114            Self::PostFailed(msg) => write!(f, "Post failed: {msg}"),
115            Self::StorageError(msg) => write!(f, "Storage error: {msg}"),
116            Self::NetworkError(msg) => write!(f, "Network error: {msg}"),
117            Self::Other(msg) => write!(f, "{msg}"),
118        }
119    }
120}
121
122impl std::error::Error for ContentLoopError {}
123
124// ============================================================================
125// WP08 port traits: Mentions + Discovery loops
126// ============================================================================
127
128/// Port for fetching @-mentions from X API.
129#[async_trait::async_trait]
130pub trait MentionsFetcher: Send + Sync {
131    /// Fetch mentions since the given ID. Returns newest first.
132    async fn get_mentions(&self, since_id: Option<&str>) -> Result<Vec<LoopTweet>, LoopError>;
133}
134
135/// Port for searching tweets by keyword.
136#[async_trait::async_trait]
137pub trait TweetSearcher: Send + Sync {
138    /// Search for tweets matching the query.
139    async fn search_tweets(&self, query: &str) -> Result<Vec<LoopTweet>, LoopError>;
140}
141
142/// Output from reply generation, carrying both the text and optional vault citations.
143#[derive(Debug, Clone)]
144pub struct ReplyOutput {
145    /// The generated reply text.
146    pub text: String,
147    /// Vault citations used to ground the reply (empty when no vault context).
148    pub vault_citations: Vec<crate::context::retrieval::VaultCitation>,
149}
150
151/// Port for generating reply content via LLM.
152#[async_trait::async_trait]
153pub trait ReplyGenerator: Send + Sync {
154    /// Generate a reply to the given tweet.
155    ///
156    /// When `mention_product` is true, the reply may reference the product.
157    /// When false, the reply must be purely helpful with no product mention.
158    async fn generate_reply(
159        &self,
160        tweet_text: &str,
161        author: &str,
162        mention_product: bool,
163    ) -> Result<String, LoopError>;
164
165    /// Generate a reply with optional RAG context from the vault.
166    ///
167    /// Default implementation ignores RAG context and delegates to `generate_reply`.
168    /// Override in adapters that support vault-aware generation.
169    async fn generate_reply_with_rag(
170        &self,
171        tweet_text: &str,
172        author: &str,
173        mention_product: bool,
174    ) -> Result<ReplyOutput, LoopError> {
175        let text = self
176            .generate_reply(tweet_text, author, mention_product)
177            .await?;
178        Ok(ReplyOutput {
179            text,
180            vault_citations: vec![],
181        })
182    }
183}
184
185/// Port for safety checks (rate limits and dedup).
186#[async_trait::async_trait]
187pub trait SafetyChecker: Send + Sync {
188    /// Check if we can reply (under daily rate limit).
189    async fn can_reply(&self) -> bool;
190
191    /// Check if we've already replied to this tweet.
192    async fn has_replied_to(&self, tweet_id: &str) -> bool;
193
194    /// Record a reply for dedup and rate limit tracking.
195    async fn record_reply(&self, tweet_id: &str, reply_content: &str) -> Result<(), LoopError>;
196}
197
198/// Port for scoring tweets.
199pub trait TweetScorer: Send + Sync {
200    /// Score a tweet for reply-worthiness.
201    fn score(&self, tweet: &LoopTweet) -> ScoreResult;
202}
203
204/// Port for persisting loop state (since_id, discovered tweets, action log).
205#[async_trait::async_trait]
206pub trait LoopStorage: Send + Sync {
207    /// Get a persisted cursor value (e.g., since_id for mentions).
208    async fn get_cursor(&self, key: &str) -> Result<Option<String>, LoopError>;
209
210    /// Set a persisted cursor value.
211    async fn set_cursor(&self, key: &str, value: &str) -> Result<(), LoopError>;
212
213    /// Check if a discovered tweet already exists (dedup by tweet ID).
214    async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError>;
215
216    /// Store a discovered tweet with its score and matched keyword.
217    async fn store_discovered_tweet(
218        &self,
219        tweet: &LoopTweet,
220        score: f32,
221        keyword: &str,
222    ) -> Result<(), LoopError>;
223
224    /// Log an action (for audit trail and status reporting).
225    async fn log_action(
226        &self,
227        action_type: &str,
228        status: &str,
229        message: &str,
230    ) -> Result<(), LoopError>;
231}
232
233/// Port for sending post actions to the posting queue.
234#[async_trait::async_trait]
235pub trait PostSender: Send + Sync {
236    /// Send a reply to a tweet through the posting queue.
237    async fn send_reply(&self, tweet_id: &str, content: &str) -> Result<(), LoopError>;
238}
239
240// ============================================================================
241// WP09 port traits: Content + Thread loops
242// ============================================================================
243
244/// Queries top-performing topics for epsilon-greedy topic selection.
245#[async_trait::async_trait]
246pub trait TopicScorer: Send + Sync {
247    /// Get the top-performing topics, ordered by score descending.
248    async fn get_top_topics(&self, limit: u32) -> Result<Vec<String>, ContentLoopError>;
249}
250
251/// Generates individual tweets on a given topic.
252#[async_trait::async_trait]
253pub trait TweetGenerator: Send + Sync {
254    /// Generate an educational tweet on the given topic.
255    async fn generate_tweet(&self, topic: &str) -> Result<String, ContentLoopError>;
256}
257
258/// Checks safety limits for content posting.
259#[async_trait::async_trait]
260pub trait ContentSafety: Send + Sync {
261    /// Check if a tweet can be posted (daily limit not reached).
262    async fn can_post_tweet(&self) -> bool;
263    /// Check if a thread can be posted (weekly limit not reached).
264    async fn can_post_thread(&self) -> bool;
265}
266
267/// Storage operations for content and thread loops.
268#[async_trait::async_trait]
269pub trait ContentStorage: Send + Sync {
270    /// Get the timestamp of the most recent posted tweet.
271    async fn last_tweet_time(
272        &self,
273    ) -> Result<Option<chrono::DateTime<chrono::Utc>>, ContentLoopError>;
274
275    /// Get the timestamp of the most recent posted thread.
276    async fn last_thread_time(
277        &self,
278    ) -> Result<Option<chrono::DateTime<chrono::Utc>>, ContentLoopError>;
279
280    /// Get the timestamps of all tweets posted today (for slot-based scheduling).
281    async fn todays_tweet_times(
282        &self,
283    ) -> Result<Vec<chrono::DateTime<chrono::Utc>>, ContentLoopError>;
284
285    /// Post a tweet (sends to posting queue and records in DB).
286    async fn post_tweet(&self, topic: &str, content: &str) -> Result<(), ContentLoopError>;
287
288    /// Create a thread record in the database. Returns the thread ID.
289    async fn create_thread(
290        &self,
291        topic: &str,
292        tweet_count: usize,
293    ) -> Result<String, ContentLoopError>;
294
295    /// Update thread status (pending, posting, sent, partial).
296    async fn update_thread_status(
297        &self,
298        thread_id: &str,
299        status: &str,
300        tweet_count: usize,
301        root_tweet_id: Option<&str>,
302    ) -> Result<(), ContentLoopError>;
303
304    /// Record a thread tweet (position in reply chain).
305    async fn store_thread_tweet(
306        &self,
307        thread_id: &str,
308        position: usize,
309        tweet_id: &str,
310        content: &str,
311    ) -> Result<(), ContentLoopError>;
312
313    /// Log an action to the audit trail.
314    async fn log_action(
315        &self,
316        action_type: &str,
317        status: &str,
318        message: &str,
319    ) -> Result<(), ContentLoopError>;
320
321    /// Check for scheduled content that is due for posting.
322    ///
323    /// Returns the next due item (content, id) or None.
324    async fn next_scheduled_item(&self) -> Result<Option<(i64, String, String)>, ContentLoopError> {
325        // Default: no scheduled content support.
326        Ok(None)
327    }
328
329    /// Mark a scheduled content item as posted.
330    async fn mark_scheduled_posted(
331        &self,
332        _id: i64,
333        _tweet_id: Option<&str>,
334    ) -> Result<(), ContentLoopError> {
335        Ok(())
336    }
337
338    /// Mark a thread as permanently failed (no more retries).
339    /// Stores error reason and failure timestamp.
340    async fn mark_failed_permanent(
341        &self,
342        thread_id: &str,
343        error: &str,
344    ) -> Result<(), ContentLoopError> {
345        // Default: no-op for implementations that don't track failure state.
346        let _ = (thread_id, error);
347        Ok(())
348    }
349
350    /// Increment retry count for a thread (for transient failures).
351    /// Returns the new retry count; implementation tracks failure_kind='transient'.
352    async fn increment_retry(&self, thread_id: &str, error: &str) -> Result<u32, ContentLoopError> {
353        // Default: no-op, return 0 (no retries tracked).
354        let _ = (thread_id, error);
355        Ok(0)
356    }
357}
358
359/// Posts tweets directly to X (for thread reply chains).
360///
361/// Thread tweets bypass the posting queue because reply chain
362/// order must be maintained -- each tweet must reply to the previous.
363#[async_trait::async_trait]
364pub trait ThreadPoster: Send + Sync {
365    /// Post a standalone tweet. Returns the tweet ID.
366    async fn post_tweet(&self, content: &str) -> Result<String, ContentLoopError>;
367
368    /// Reply to a tweet. Returns the new tweet ID.
369    async fn reply_to_tweet(
370        &self,
371        in_reply_to: &str,
372        content: &str,
373    ) -> Result<String, ContentLoopError>;
374}
375
376// ============================================================================
377// Shared utilities
378// ============================================================================
379
380/// Tracks consecutive errors to prevent infinite retry loops.
381///
382/// If a loop encounters `max_consecutive` errors without a single
383/// success, it should pause for `pause_duration` before retrying.
384#[derive(Debug)]
385pub struct ConsecutiveErrorTracker {
386    count: u32,
387    max_consecutive: u32,
388    pause_duration: Duration,
389}
390
391impl ConsecutiveErrorTracker {
392    /// Create a new tracker.
393    ///
394    /// - `max_consecutive`: Number of consecutive errors before pausing.
395    /// - `pause_duration`: How long to pause after hitting the limit.
396    pub fn new(max_consecutive: u32, pause_duration: Duration) -> Self {
397        Self {
398            count: 0,
399            max_consecutive,
400            pause_duration,
401        }
402    }
403
404    /// Record an error. Returns true if the loop should pause.
405    pub fn record_error(&mut self) -> bool {
406        self.count += 1;
407        self.count >= self.max_consecutive
408    }
409
410    /// Record a success, resetting the counter.
411    pub fn record_success(&mut self) {
412        self.count = 0;
413    }
414
415    /// Whether the loop should pause due to too many consecutive errors.
416    pub fn should_pause(&self) -> bool {
417        self.count >= self.max_consecutive
418    }
419
420    /// How long to pause.
421    pub fn pause_duration(&self) -> Duration {
422        self.pause_duration
423    }
424
425    /// Current consecutive error count.
426    pub fn count(&self) -> u32 {
427        self.count
428    }
429
430    /// Reset the counter.
431    pub fn reset(&mut self) {
432        self.count = 0;
433    }
434}
435
436/// Compute a backoff duration for rate limit errors.
437///
438/// Uses the `retry_after` hint if available, otherwise exponential
439/// backoff starting at 60s, capped at 15 minutes.
440pub fn rate_limit_backoff(retry_after: Option<u64>, attempt: u32) -> Duration {
441    if let Some(secs) = retry_after {
442        Duration::from_secs(secs)
443    } else {
444        let base = 60u64;
445        let exp = base.saturating_mul(2u64.saturating_pow(attempt));
446        Duration::from_secs(exp.min(900)) // cap at 15 minutes
447    }
448}
449
450// ============================================================================
451// Retry + Dead-Letter Support (Task C2)
452// ============================================================================
453
454/// Classify an error as transient (retryable) or permanent (dead-letter).
455///
456/// Transient errors: 429 (rate limit), 5xx (server), timeout, connection reset
457/// Permanent errors: 401 (auth), validation, not found, bad request
458pub fn is_transient_error(error_msg: &str) -> bool {
459    let lower = error_msg.to_lowercase();
460    // Rate limit, server error, timeout, connection issues
461    lower.contains("429")
462        || lower.contains("500")
463        || lower.contains("502")
464        || lower.contains("503")
465        || lower.contains("504")
466        || lower.contains("timeout")
467        || lower.contains("timed out")
468        || lower.contains("connection reset")
469        || lower.contains("connection refused")
470        || lower.contains("try again")
471}
472
473/// Compute exponential backoff duration for thread retry.
474///
475/// Base: 30 seconds, doubled per attempt, max 3 retries.
476/// Caps at 3+ minutes (generous for transient failures).
477pub fn thread_retry_backoff(retry_count: u32) -> Duration {
478    let base_secs = 30u64;
479    let exp = base_secs.saturating_mul(2u64.saturating_pow(retry_count));
480    Duration::from_secs(exp.min(300)) // cap at 5 minutes
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486
487    #[test]
488    fn error_tracker_records_errors() {
489        let mut tracker = ConsecutiveErrorTracker::new(3, Duration::from_secs(300));
490        assert!(!tracker.should_pause());
491        assert_eq!(tracker.count(), 0);
492
493        assert!(!tracker.record_error()); // 1
494        assert!(!tracker.record_error()); // 2
495        assert!(tracker.record_error()); // 3 -- should pause
496        assert!(tracker.should_pause());
497        assert_eq!(tracker.count(), 3);
498    }
499
500    #[test]
501    fn error_tracker_resets_on_success() {
502        let mut tracker = ConsecutiveErrorTracker::new(3, Duration::from_secs(300));
503        tracker.record_error();
504        tracker.record_error();
505        tracker.record_success();
506        assert_eq!(tracker.count(), 0);
507        assert!(!tracker.should_pause());
508    }
509
510    #[test]
511    fn error_tracker_pause_duration() {
512        let tracker = ConsecutiveErrorTracker::new(5, Duration::from_secs(120));
513        assert_eq!(tracker.pause_duration(), Duration::from_secs(120));
514    }
515
516    #[test]
517    fn rate_limit_backoff_with_retry_after() {
518        assert_eq!(rate_limit_backoff(Some(30), 0), Duration::from_secs(30));
519        assert_eq!(rate_limit_backoff(Some(120), 5), Duration::from_secs(120));
520    }
521
522    #[test]
523    fn rate_limit_backoff_exponential() {
524        assert_eq!(rate_limit_backoff(None, 0), Duration::from_secs(60));
525        assert_eq!(rate_limit_backoff(None, 1), Duration::from_secs(120));
526        assert_eq!(rate_limit_backoff(None, 2), Duration::from_secs(240));
527    }
528
529    #[test]
530    fn rate_limit_backoff_capped_at_15_minutes() {
531        assert_eq!(rate_limit_backoff(None, 10), Duration::from_secs(900));
532    }
533
534    #[test]
535    fn loop_error_display() {
536        let err = LoopError::RateLimited {
537            retry_after: Some(30),
538        };
539        assert_eq!(err.to_string(), "rate limited, retry after 30s");
540
541        let err = LoopError::AuthExpired;
542        assert_eq!(err.to_string(), "authentication expired");
543
544        let err = LoopError::LlmFailure("timeout".to_string());
545        assert_eq!(err.to_string(), "LLM failure: timeout");
546    }
547
548    #[test]
549    fn loop_tweet_debug() {
550        let tweet = LoopTweet {
551            id: "123".to_string(),
552            text: "hello".to_string(),
553            author_id: "uid_123".to_string(),
554            author_username: "user".to_string(),
555            author_followers: 1000,
556            created_at: "2026-01-01T00:00:00Z".to_string(),
557            likes: 10,
558            retweets: 2,
559            replies: 1,
560        };
561        let debug = format!("{tweet:?}");
562        assert!(debug.contains("123"));
563    }
564
565    #[test]
566    fn reply_output_creation() {
567        let output = ReplyOutput {
568            text: "Great point!".to_string(),
569            vault_citations: vec![],
570        };
571        assert_eq!(output.text, "Great point!");
572        assert!(output.vault_citations.is_empty());
573    }
574
575    #[test]
576    fn content_loop_error_display() {
577        let err = ContentLoopError::LlmFailure("model down".to_string());
578        assert_eq!(err.to_string(), "LLM failure: model down");
579
580        let err = ContentLoopError::PostFailed("429".to_string());
581        assert_eq!(err.to_string(), "Post failed: 429");
582
583        let err = ContentLoopError::StorageError("disk full".to_string());
584        assert_eq!(err.to_string(), "Storage error: disk full");
585
586        let err = ContentLoopError::NetworkError("timeout".to_string());
587        assert_eq!(err.to_string(), "Network error: timeout");
588
589        let err = ContentLoopError::Other("unknown".to_string());
590        assert_eq!(err.to_string(), "unknown");
591    }
592
593    #[test]
594    fn error_tracker_manual_reset() {
595        let mut tracker = ConsecutiveErrorTracker::new(3, Duration::from_secs(300));
596        tracker.record_error();
597        tracker.record_error();
598        assert_eq!(tracker.count(), 2);
599        tracker.reset();
600        assert_eq!(tracker.count(), 0);
601        assert!(!tracker.should_pause());
602    }
603
604    #[test]
605    fn loop_error_display_remaining_variants() {
606        let err = LoopError::RateLimited { retry_after: None };
607        assert_eq!(err.to_string(), "rate limited");
608
609        let err = LoopError::NetworkError("timeout".to_string());
610        assert_eq!(err.to_string(), "network error: timeout");
611
612        let err = LoopError::StorageError("disk full".to_string());
613        assert_eq!(err.to_string(), "storage error: disk full");
614
615        let err = LoopError::Other("unknown".to_string());
616        assert_eq!(err.to_string(), "unknown");
617    }
618
619    #[test]
620    fn rate_limit_backoff_very_high_attempt() {
621        assert_eq!(rate_limit_backoff(None, 100), Duration::from_secs(900));
622    }
623
624    #[test]
625    fn consecutive_error_tracker_new_state() {
626        let tracker = ConsecutiveErrorTracker::new(5, Duration::from_secs(60));
627        assert_eq!(tracker.count(), 0);
628        assert!(!tracker.should_pause());
629        assert_eq!(tracker.pause_duration(), Duration::from_secs(60));
630    }
631
632    #[test]
633    fn score_result_debug() {
634        let sr = ScoreResult {
635            total: 75.0,
636            meets_threshold: true,
637            matched_keywords: vec!["rust".to_string()],
638        };
639        let debug = format!("{sr:?}");
640        assert!(debug.contains("75"));
641        assert!(debug.contains("rust"));
642    }
643}