Skip to main content

tuitbot_core/automation/
discovery_loop.rs

1//! Tweet discovery loop.
2//!
3//! Searches X using configured keywords, scores each tweet with the
4//! scoring engine, filters by threshold, generates replies for
5//! qualifying tweets, and posts them through the posting queue.
6//! Rotates keywords across iterations to distribute API usage.
7
8use super::loop_helpers::{
9    ConsecutiveErrorTracker, LoopError, LoopStorage, LoopTweet, PostSender, ReplyGenerator,
10    SafetyChecker, TweetScorer, TweetSearcher,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18/// Discovery loop that finds and replies to relevant tweets.
19pub struct DiscoveryLoop {
20    searcher: Arc<dyn TweetSearcher>,
21    scorer: Arc<dyn TweetScorer>,
22    generator: Arc<dyn ReplyGenerator>,
23    safety: Arc<dyn SafetyChecker>,
24    storage: Arc<dyn LoopStorage>,
25    poster: Arc<dyn PostSender>,
26    keywords: Vec<String>,
27    threshold: f32,
28    dry_run: bool,
29}
30
31/// Result of processing a single discovered tweet.
32#[derive(Debug)]
33pub enum DiscoveryResult {
34    /// Reply was sent (or would be sent in dry-run).
35    Replied {
36        tweet_id: String,
37        author: String,
38        score: f32,
39        reply_text: String,
40    },
41    /// Tweet scored below threshold.
42    BelowThreshold { tweet_id: String, score: f32 },
43    /// Tweet was skipped (safety check, already exists).
44    Skipped { tweet_id: String, reason: String },
45    /// Processing failed for this tweet.
46    Failed { tweet_id: String, error: String },
47}
48
49/// Summary of a discovery iteration.
50#[derive(Debug, Default)]
51pub struct DiscoverySummary {
52    /// Total tweets found across all keywords searched.
53    pub tweets_found: usize,
54    /// Tweets that scored above threshold.
55    pub qualifying: usize,
56    /// Replies sent (or would be sent in dry-run).
57    pub replied: usize,
58    /// Tweets skipped (safety, dedup, below threshold).
59    pub skipped: usize,
60    /// Tweets that failed processing.
61    pub failed: usize,
62}
63
64impl DiscoveryLoop {
65    /// Create a new discovery loop.
66    #[allow(clippy::too_many_arguments)]
67    pub fn new(
68        searcher: Arc<dyn TweetSearcher>,
69        scorer: Arc<dyn TweetScorer>,
70        generator: Arc<dyn ReplyGenerator>,
71        safety: Arc<dyn SafetyChecker>,
72        storage: Arc<dyn LoopStorage>,
73        poster: Arc<dyn PostSender>,
74        keywords: Vec<String>,
75        threshold: f32,
76        dry_run: bool,
77    ) -> Self {
78        Self {
79            searcher,
80            scorer,
81            generator,
82            safety,
83            storage,
84            poster,
85            keywords,
86            threshold,
87            dry_run,
88        }
89    }
90
91    /// Run the continuous discovery loop until cancellation.
92    ///
93    /// Rotates through keywords across iterations to distribute API usage.
94    pub async fn run(
95        &self,
96        cancel: CancellationToken,
97        scheduler: LoopScheduler,
98        schedule: Option<Arc<ActiveSchedule>>,
99    ) {
100        tracing::info!(
101            dry_run = self.dry_run,
102            keywords = self.keywords.len(),
103            threshold = self.threshold,
104            "Discovery loop started"
105        );
106
107        if self.keywords.is_empty() {
108            tracing::warn!("No keywords configured, discovery loop has nothing to search");
109            cancel.cancelled().await;
110            return;
111        }
112
113        let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
114        let mut keyword_index = 0usize;
115
116        loop {
117            if cancel.is_cancelled() {
118                break;
119            }
120
121            if !schedule_gate(&schedule, &cancel).await {
122                break;
123            }
124
125            // Select next keyword (round-robin)
126            let keyword = &self.keywords[keyword_index % self.keywords.len()];
127            keyword_index += 1;
128
129            match self.search_and_process(keyword, None).await {
130                Ok((_results, summary)) => {
131                    error_tracker.record_success();
132                    if summary.tweets_found > 0 {
133                        tracing::info!(
134                            keyword = %keyword,
135                            found = summary.tweets_found,
136                            qualifying = summary.qualifying,
137                            replied = summary.replied,
138                            "Discovery iteration complete"
139                        );
140                    }
141                }
142                Err(e) => {
143                    let should_pause = error_tracker.record_error();
144                    tracing::warn!(
145                        keyword = %keyword,
146                        error = %e,
147                        consecutive_errors = error_tracker.count(),
148                        "Discovery iteration failed"
149                    );
150
151                    if should_pause {
152                        tracing::warn!(
153                            pause_secs = error_tracker.pause_duration().as_secs(),
154                            "Pausing discovery loop due to consecutive errors"
155                        );
156                        tokio::select! {
157                            _ = cancel.cancelled() => break,
158                            _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
159                        }
160                        error_tracker.reset();
161                        continue;
162                    }
163
164                    if let LoopError::RateLimited { retry_after } = &e {
165                        let backoff = super::loop_helpers::rate_limit_backoff(*retry_after, 0);
166                        tokio::select! {
167                            _ = cancel.cancelled() => break,
168                            _ = tokio::time::sleep(backoff) => {},
169                        }
170                        continue;
171                    }
172                }
173            }
174
175            tokio::select! {
176                _ = cancel.cancelled() => break,
177                _ = scheduler.tick() => {},
178            }
179        }
180
181        tracing::info!("Discovery loop stopped");
182    }
183
184    /// Run a single-shot discovery across all keywords.
185    ///
186    /// Used by the CLI `tuitbot discover` command. Searches all keywords
187    /// (not rotating) and returns all results sorted by score descending.
188    pub async fn run_once(
189        &self,
190        limit: Option<usize>,
191    ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
192        let mut all_results = Vec::new();
193        let mut summary = DiscoverySummary::default();
194        let mut total_processed = 0usize;
195
196        for keyword in &self.keywords {
197            if let Some(max) = limit {
198                if total_processed >= max {
199                    break;
200                }
201            }
202
203            let remaining = limit.map(|max| max.saturating_sub(total_processed));
204            match self.search_and_process(keyword, remaining).await {
205                Ok((results, iter_summary)) => {
206                    summary.tweets_found += iter_summary.tweets_found;
207                    summary.qualifying += iter_summary.qualifying;
208                    summary.replied += iter_summary.replied;
209                    summary.skipped += iter_summary.skipped;
210                    summary.failed += iter_summary.failed;
211                    total_processed += iter_summary.tweets_found;
212                    all_results.extend(results);
213                }
214                Err(e) => {
215                    tracing::warn!(keyword = %keyword, error = %e, "Search failed for keyword");
216                }
217            }
218        }
219
220        Ok((all_results, summary))
221    }
222
223    /// Search for a single keyword and process all results.
224    async fn search_and_process(
225        &self,
226        keyword: &str,
227        limit: Option<usize>,
228    ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
229        tracing::info!(keyword = %keyword, "Searching keyword");
230        let tweets = self.searcher.search_tweets(keyword).await?;
231
232        let mut summary = DiscoverySummary {
233            tweets_found: tweets.len(),
234            ..Default::default()
235        };
236
237        let to_process = match limit {
238            Some(n) => &tweets[..tweets.len().min(n)],
239            None => &tweets,
240        };
241
242        let mut results = Vec::with_capacity(to_process.len());
243
244        for tweet in to_process {
245            let result = self.process_tweet(tweet, keyword).await;
246
247            match &result {
248                DiscoveryResult::Replied { .. } => {
249                    summary.qualifying += 1;
250                    summary.replied += 1;
251                }
252                DiscoveryResult::BelowThreshold { .. } => {
253                    summary.skipped += 1;
254                }
255                DiscoveryResult::Skipped { .. } => {
256                    summary.skipped += 1;
257                }
258                DiscoveryResult::Failed { .. } => {
259                    summary.failed += 1;
260                }
261            }
262
263            results.push(result);
264        }
265
266        Ok((results, summary))
267    }
268
269    /// Process a single discovered tweet: dedup, score, generate reply, post.
270    async fn process_tweet(&self, tweet: &LoopTweet, keyword: &str) -> DiscoveryResult {
271        // Check if already discovered (dedup)
272        match self.storage.tweet_exists(&tweet.id).await {
273            Ok(true) => {
274                tracing::debug!(tweet_id = %tweet.id, "Tweet already discovered, skipping");
275                return DiscoveryResult::Skipped {
276                    tweet_id: tweet.id.clone(),
277                    reason: "already discovered".to_string(),
278                };
279            }
280            Ok(false) => {}
281            Err(e) => {
282                tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check tweet existence");
283                // Continue anyway -- best effort dedup
284            }
285        }
286
287        // Score the tweet
288        let score_result = self.scorer.score(tweet);
289
290        // Store discovered tweet (even if below threshold, useful for analytics)
291        if let Err(e) = self
292            .storage
293            .store_discovered_tweet(tweet, score_result.total, keyword)
294            .await
295        {
296            tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to store discovered tweet");
297        }
298
299        // Check threshold
300        if !score_result.meets_threshold {
301            tracing::debug!(
302                tweet_id = %tweet.id,
303                score = score_result.total,
304                threshold = self.threshold,
305                "Tweet scored below threshold, skipping"
306            );
307            return DiscoveryResult::BelowThreshold {
308                tweet_id: tweet.id.clone(),
309                score: score_result.total,
310            };
311        }
312
313        // Safety checks
314        if self.safety.has_replied_to(&tweet.id).await {
315            return DiscoveryResult::Skipped {
316                tweet_id: tweet.id.clone(),
317                reason: "already replied".to_string(),
318            };
319        }
320
321        if !self.safety.can_reply().await {
322            return DiscoveryResult::Skipped {
323                tweet_id: tweet.id.clone(),
324                reason: "rate limited".to_string(),
325            };
326        }
327
328        // Generate reply (product mention decided by caller or random)
329        let reply_text = match self
330            .generator
331            .generate_reply(&tweet.text, &tweet.author_username, true)
332            .await
333        {
334            Ok(text) => text,
335            Err(e) => {
336                tracing::error!(
337                    tweet_id = %tweet.id,
338                    error = %e,
339                    "Failed to generate reply"
340                );
341                return DiscoveryResult::Failed {
342                    tweet_id: tweet.id.clone(),
343                    error: e.to_string(),
344                };
345            }
346        };
347
348        tracing::info!(
349            author = %tweet.author_username,
350            score = format!("{:.0}", score_result.total),
351            "Posted reply to @{}",
352            tweet.author_username,
353        );
354
355        if self.dry_run {
356            tracing::info!(
357                "DRY RUN: Tweet {} by @{} scored {:.0}/100 -- Would reply: \"{}\"",
358                tweet.id,
359                tweet.author_username,
360                score_result.total,
361                reply_text
362            );
363
364            let _ = self
365                .storage
366                .log_action(
367                    "discovery_reply",
368                    "dry_run",
369                    &format!(
370                        "Score {:.0}, reply to @{}: {}",
371                        score_result.total,
372                        tweet.author_username,
373                        truncate(&reply_text, 50)
374                    ),
375                )
376                .await;
377        } else {
378            if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
379                tracing::error!(tweet_id = %tweet.id, error = %e, "Failed to send reply");
380                return DiscoveryResult::Failed {
381                    tweet_id: tweet.id.clone(),
382                    error: e.to_string(),
383                };
384            }
385
386            if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
387                tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
388            }
389
390            let _ = self
391                .storage
392                .log_action(
393                    "discovery_reply",
394                    "success",
395                    &format!(
396                        "Score {:.0}, replied to @{}: {}",
397                        score_result.total,
398                        tweet.author_username,
399                        truncate(&reply_text, 50)
400                    ),
401                )
402                .await;
403        }
404
405        DiscoveryResult::Replied {
406            tweet_id: tweet.id.clone(),
407            author: tweet.author_username.clone(),
408            score: score_result.total,
409            reply_text,
410        }
411    }
412}
413
414/// Truncate a string for display.
415fn truncate(s: &str, max_len: usize) -> String {
416    if s.len() <= max_len {
417        s.to_string()
418    } else {
419        format!("{}...", &s[..max_len])
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use crate::automation::ScoreResult;
427    use std::sync::Mutex;
428
429    // --- Mock implementations ---
430
431    struct MockSearcher {
432        results: Vec<LoopTweet>,
433    }
434
435    #[async_trait::async_trait]
436    impl TweetSearcher for MockSearcher {
437        async fn search_tweets(&self, _query: &str) -> Result<Vec<LoopTweet>, LoopError> {
438            Ok(self.results.clone())
439        }
440    }
441
442    struct FailingSearcher;
443
444    #[async_trait::async_trait]
445    impl TweetSearcher for FailingSearcher {
446        async fn search_tweets(&self, _query: &str) -> Result<Vec<LoopTweet>, LoopError> {
447            Err(LoopError::RateLimited {
448                retry_after: Some(60),
449            })
450        }
451    }
452
453    struct MockScorer {
454        score: f32,
455        meets_threshold: bool,
456    }
457
458    impl TweetScorer for MockScorer {
459        fn score(&self, _tweet: &LoopTweet) -> ScoreResult {
460            ScoreResult {
461                total: self.score,
462                meets_threshold: self.meets_threshold,
463                matched_keywords: vec!["test".to_string()],
464            }
465        }
466    }
467
468    struct MockGenerator {
469        reply: String,
470    }
471
472    #[async_trait::async_trait]
473    impl ReplyGenerator for MockGenerator {
474        async fn generate_reply(
475            &self,
476            _tweet_text: &str,
477            _author: &str,
478            _mention_product: bool,
479        ) -> Result<String, LoopError> {
480            Ok(self.reply.clone())
481        }
482    }
483
484    struct MockSafety {
485        can_reply: bool,
486        replied_ids: Mutex<Vec<String>>,
487    }
488
489    impl MockSafety {
490        fn new(can_reply: bool) -> Self {
491            Self {
492                can_reply,
493                replied_ids: Mutex::new(Vec::new()),
494            }
495        }
496    }
497
498    #[async_trait::async_trait]
499    impl SafetyChecker for MockSafety {
500        async fn can_reply(&self) -> bool {
501            self.can_reply
502        }
503        async fn has_replied_to(&self, tweet_id: &str) -> bool {
504            self.replied_ids
505                .lock()
506                .expect("lock")
507                .contains(&tweet_id.to_string())
508        }
509        async fn record_reply(&self, tweet_id: &str, _content: &str) -> Result<(), LoopError> {
510            self.replied_ids
511                .lock()
512                .expect("lock")
513                .push(tweet_id.to_string());
514            Ok(())
515        }
516    }
517
518    struct MockStorage {
519        existing_ids: Mutex<Vec<String>>,
520        discovered: Mutex<Vec<String>>,
521        actions: Mutex<Vec<(String, String, String)>>,
522    }
523
524    impl MockStorage {
525        fn new() -> Self {
526            Self {
527                existing_ids: Mutex::new(Vec::new()),
528                discovered: Mutex::new(Vec::new()),
529                actions: Mutex::new(Vec::new()),
530            }
531        }
532    }
533
534    #[async_trait::async_trait]
535    impl LoopStorage for MockStorage {
536        async fn get_cursor(&self, _key: &str) -> Result<Option<String>, LoopError> {
537            Ok(None)
538        }
539        async fn set_cursor(&self, _key: &str, _value: &str) -> Result<(), LoopError> {
540            Ok(())
541        }
542        async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
543            Ok(self
544                .existing_ids
545                .lock()
546                .expect("lock")
547                .contains(&tweet_id.to_string()))
548        }
549        async fn store_discovered_tweet(
550            &self,
551            tweet: &LoopTweet,
552            _score: f32,
553            _keyword: &str,
554        ) -> Result<(), LoopError> {
555            self.discovered.lock().expect("lock").push(tweet.id.clone());
556            Ok(())
557        }
558        async fn log_action(
559            &self,
560            action_type: &str,
561            status: &str,
562            message: &str,
563        ) -> Result<(), LoopError> {
564            self.actions.lock().expect("lock").push((
565                action_type.to_string(),
566                status.to_string(),
567                message.to_string(),
568            ));
569            Ok(())
570        }
571    }
572
573    struct MockPoster {
574        sent: Mutex<Vec<(String, String)>>,
575    }
576
577    impl MockPoster {
578        fn new() -> Self {
579            Self {
580                sent: Mutex::new(Vec::new()),
581            }
582        }
583        fn sent_count(&self) -> usize {
584            self.sent.lock().expect("lock").len()
585        }
586    }
587
588    #[async_trait::async_trait]
589    impl PostSender for MockPoster {
590        async fn send_reply(&self, tweet_id: &str, content: &str) -> Result<(), LoopError> {
591            self.sent
592                .lock()
593                .expect("lock")
594                .push((tweet_id.to_string(), content.to_string()));
595            Ok(())
596        }
597    }
598
599    fn test_tweet(id: &str, author: &str) -> LoopTweet {
600        LoopTweet {
601            id: id.to_string(),
602            text: format!("Test tweet about rust from @{author}"),
603            author_id: format!("uid_{author}"),
604            author_username: author.to_string(),
605            author_followers: 5000,
606            created_at: "2026-01-01T00:00:00Z".to_string(),
607            likes: 20,
608            retweets: 5,
609            replies: 3,
610        }
611    }
612
613    fn build_loop(
614        tweets: Vec<LoopTweet>,
615        score: f32,
616        meets_threshold: bool,
617        dry_run: bool,
618    ) -> (DiscoveryLoop, Arc<MockPoster>, Arc<MockStorage>) {
619        let poster = Arc::new(MockPoster::new());
620        let storage = Arc::new(MockStorage::new());
621        let discovery = DiscoveryLoop::new(
622            Arc::new(MockSearcher { results: tweets }),
623            Arc::new(MockScorer {
624                score,
625                meets_threshold,
626            }),
627            Arc::new(MockGenerator {
628                reply: "Great insight!".to_string(),
629            }),
630            Arc::new(MockSafety::new(true)),
631            storage.clone(),
632            poster.clone(),
633            vec!["rust".to_string(), "cli".to_string()],
634            70.0,
635            dry_run,
636        );
637        (discovery, poster, storage)
638    }
639
640    // --- Tests ---
641
642    #[tokio::test]
643    async fn search_and_process_no_results() {
644        let (discovery, poster, _) = build_loop(Vec::new(), 80.0, true, false);
645        let (results, summary) = discovery.search_and_process("rust", None).await.unwrap();
646        assert_eq!(summary.tweets_found, 0);
647        assert!(results.is_empty());
648        assert_eq!(poster.sent_count(), 0);
649    }
650
651    #[tokio::test]
652    async fn search_and_process_above_threshold() {
653        let tweets = vec![test_tweet("100", "alice"), test_tweet("101", "bob")];
654        let (discovery, poster, storage) = build_loop(tweets, 85.0, true, false);
655
656        let (results, summary) = discovery.search_and_process("rust", None).await.unwrap();
657
658        assert_eq!(summary.tweets_found, 2);
659        assert_eq!(summary.replied, 2);
660        assert_eq!(results.len(), 2);
661        assert_eq!(poster.sent_count(), 2);
662
663        // Both tweets should be stored as discovered
664        let discovered = storage.discovered.lock().expect("lock");
665        assert_eq!(discovered.len(), 2);
666    }
667
668    #[tokio::test]
669    async fn search_and_process_below_threshold() {
670        let tweets = vec![test_tweet("100", "alice")];
671        let (discovery, poster, storage) = build_loop(tweets, 40.0, false, false);
672
673        let (results, summary) = discovery.search_and_process("rust", None).await.unwrap();
674
675        assert_eq!(summary.tweets_found, 1);
676        assert_eq!(summary.skipped, 1);
677        assert_eq!(summary.replied, 0);
678        assert_eq!(results.len(), 1);
679        assert_eq!(poster.sent_count(), 0);
680
681        // Tweet should still be stored as discovered (for analytics)
682        let discovered = storage.discovered.lock().expect("lock");
683        assert_eq!(discovered.len(), 1);
684    }
685
686    #[tokio::test]
687    async fn search_and_process_dry_run() {
688        let tweets = vec![test_tweet("100", "alice")];
689        let (discovery, poster, _) = build_loop(tweets, 85.0, true, true);
690
691        let (_results, summary) = discovery.search_and_process("rust", None).await.unwrap();
692
693        assert_eq!(summary.replied, 1);
694        // Should NOT post in dry-run
695        assert_eq!(poster.sent_count(), 0);
696    }
697
698    #[tokio::test]
699    async fn search_and_process_skips_existing() {
700        let tweets = vec![test_tweet("100", "alice")];
701        let poster = Arc::new(MockPoster::new());
702        let storage = Arc::new(MockStorage::new());
703        // Pre-mark tweet as existing
704        storage
705            .existing_ids
706            .lock()
707            .expect("lock")
708            .push("100".to_string());
709
710        let discovery = DiscoveryLoop::new(
711            Arc::new(MockSearcher { results: tweets }),
712            Arc::new(MockScorer {
713                score: 85.0,
714                meets_threshold: true,
715            }),
716            Arc::new(MockGenerator {
717                reply: "Great!".to_string(),
718            }),
719            Arc::new(MockSafety::new(true)),
720            storage,
721            poster.clone(),
722            vec!["rust".to_string()],
723            70.0,
724            false,
725        );
726
727        let (_results, summary) = discovery.search_and_process("rust", None).await.unwrap();
728        assert_eq!(summary.skipped, 1);
729        assert_eq!(poster.sent_count(), 0);
730    }
731
732    #[tokio::test]
733    async fn search_and_process_respects_limit() {
734        let tweets = vec![
735            test_tweet("100", "alice"),
736            test_tweet("101", "bob"),
737            test_tweet("102", "carol"),
738        ];
739        let (discovery, poster, _) = build_loop(tweets, 85.0, true, false);
740
741        let (results, summary) = discovery.search_and_process("rust", Some(2)).await.unwrap();
742
743        assert_eq!(summary.tweets_found, 3); // found 3, but...
744        assert_eq!(results.len(), 2); // only 2 results returned
745        assert_eq!(poster.sent_count(), 2); // only processed 2
746    }
747
748    #[tokio::test]
749    async fn run_once_searches_all_keywords() {
750        let tweets = vec![test_tweet("100", "alice")];
751        let (discovery, _, _) = build_loop(tweets, 85.0, true, false);
752
753        let (_, summary) = discovery.run_once(None).await.unwrap();
754        // Should search both "rust" and "cli" keywords
755        assert_eq!(summary.tweets_found, 2); // 1 tweet per keyword
756    }
757
758    #[tokio::test]
759    async fn search_error_returns_loop_error() {
760        let poster = Arc::new(MockPoster::new());
761        let storage = Arc::new(MockStorage::new());
762        let discovery = DiscoveryLoop::new(
763            Arc::new(FailingSearcher),
764            Arc::new(MockScorer {
765                score: 85.0,
766                meets_threshold: true,
767            }),
768            Arc::new(MockGenerator {
769                reply: "test".to_string(),
770            }),
771            Arc::new(MockSafety::new(true)),
772            storage,
773            poster,
774            vec!["rust".to_string()],
775            70.0,
776            false,
777        );
778
779        let result = discovery.search_and_process("rust", None).await;
780        assert!(result.is_err());
781    }
782}