Skip to main content

tuitbot_core/automation/discovery_loop/
mod.rs

1//! Tweet discovery loop.
2//!
3//! Searches X using configured keywords, scores each tweet with the
4//! scoring engine, filters by threshold, generates replies for
5//! qualifying tweets, and posts them through the posting queue.
6//! Rotates keywords across iterations to distribute API usage.
7
8use super::loop_helpers::{
9    ConsecutiveErrorTracker, LoopError, LoopStorage, LoopTweet, PostSender, ReplyGenerator,
10    SafetyChecker, TweetScorer, TweetSearcher,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18/// Discovery loop that finds and replies to relevant tweets.
19pub struct DiscoveryLoop {
20    searcher: Arc<dyn TweetSearcher>,
21    scorer: Arc<dyn TweetScorer>,
22    generator: Arc<dyn ReplyGenerator>,
23    safety: Arc<dyn SafetyChecker>,
24    storage: Arc<dyn LoopStorage>,
25    poster: Arc<dyn PostSender>,
26    keywords: Vec<String>,
27    threshold: f32,
28    dry_run: bool,
29}
30
31/// Result of processing a single discovered tweet.
32#[derive(Debug)]
33pub enum DiscoveryResult {
34    /// Reply was sent (or would be sent in dry-run).
35    Replied {
36        tweet_id: String,
37        author: String,
38        score: f32,
39        reply_text: String,
40    },
41    /// Tweet scored below threshold.
42    BelowThreshold { tweet_id: String, score: f32 },
43    /// Tweet was skipped (safety check, already exists).
44    Skipped { tweet_id: String, reason: String },
45    /// Processing failed for this tweet.
46    Failed { tweet_id: String, error: String },
47}
48
49/// Summary of a discovery iteration.
50#[derive(Debug, Default)]
51pub struct DiscoverySummary {
52    /// Total tweets found across all keywords searched.
53    pub tweets_found: usize,
54    /// Tweets that scored above threshold.
55    pub qualifying: usize,
56    /// Replies sent (or would be sent in dry-run).
57    pub replied: usize,
58    /// Tweets skipped (safety, dedup, below threshold).
59    pub skipped: usize,
60    /// Tweets that failed processing.
61    pub failed: usize,
62}
63
64impl DiscoveryLoop {
65    /// Create a new discovery loop.
66    #[allow(clippy::too_many_arguments)]
67    pub fn new(
68        searcher: Arc<dyn TweetSearcher>,
69        scorer: Arc<dyn TweetScorer>,
70        generator: Arc<dyn ReplyGenerator>,
71        safety: Arc<dyn SafetyChecker>,
72        storage: Arc<dyn LoopStorage>,
73        poster: Arc<dyn PostSender>,
74        keywords: Vec<String>,
75        threshold: f32,
76        dry_run: bool,
77    ) -> Self {
78        Self {
79            searcher,
80            scorer,
81            generator,
82            safety,
83            storage,
84            poster,
85            keywords,
86            threshold,
87            dry_run,
88        }
89    }
90
91    /// Run the continuous discovery loop until cancellation.
92    ///
93    /// Rotates through keywords across iterations to distribute API usage.
94    pub async fn run(
95        &self,
96        cancel: CancellationToken,
97        scheduler: LoopScheduler,
98        schedule: Option<Arc<ActiveSchedule>>,
99    ) {
100        tracing::info!(
101            dry_run = self.dry_run,
102            keywords = self.keywords.len(),
103            threshold = self.threshold,
104            "Discovery loop started"
105        );
106
107        if self.keywords.is_empty() {
108            tracing::warn!("No keywords configured, discovery loop has nothing to search");
109            cancel.cancelled().await;
110            return;
111        }
112
113        let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
114        let mut keyword_index = 0usize;
115
116        loop {
117            if cancel.is_cancelled() {
118                break;
119            }
120
121            if !schedule_gate(&schedule, &cancel).await {
122                break;
123            }
124
125            // Select next keyword (round-robin)
126            let keyword = &self.keywords[keyword_index % self.keywords.len()];
127            keyword_index += 1;
128
129            match self.search_and_process(keyword, None).await {
130                Ok((_results, summary)) => {
131                    error_tracker.record_success();
132                    if summary.tweets_found > 0 {
133                        tracing::info!(
134                            keyword = %keyword,
135                            found = summary.tweets_found,
136                            qualifying = summary.qualifying,
137                            replied = summary.replied,
138                            "Discovery iteration complete"
139                        );
140                    }
141                }
142                Err(e) => {
143                    let should_pause = error_tracker.record_error();
144                    tracing::warn!(
145                        keyword = %keyword,
146                        error = %e,
147                        consecutive_errors = error_tracker.count(),
148                        "Discovery iteration failed"
149                    );
150
151                    if should_pause {
152                        tracing::warn!(
153                            pause_secs = error_tracker.pause_duration().as_secs(),
154                            "Pausing discovery loop due to consecutive errors"
155                        );
156                        tokio::select! {
157                            _ = cancel.cancelled() => break,
158                            _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
159                        }
160                        error_tracker.reset();
161                        continue;
162                    }
163
164                    if let LoopError::RateLimited { retry_after } = &e {
165                        let backoff = super::loop_helpers::rate_limit_backoff(*retry_after, 0);
166                        tokio::select! {
167                            _ = cancel.cancelled() => break,
168                            _ = tokio::time::sleep(backoff) => {},
169                        }
170                        continue;
171                    }
172                }
173            }
174
175            tokio::select! {
176                _ = cancel.cancelled() => break,
177                _ = scheduler.tick() => {},
178            }
179        }
180
181        tracing::info!("Discovery loop stopped");
182    }
183
184    /// Run a single-shot discovery across all keywords.
185    ///
186    /// Used by the CLI `tuitbot discover` command. Searches all keywords
187    /// (not rotating) and returns all results sorted by score descending.
188    pub async fn run_once(
189        &self,
190        limit: Option<usize>,
191    ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
192        let mut all_results = Vec::new();
193        let mut summary = DiscoverySummary::default();
194        let mut total_processed = 0usize;
195        let mut last_error: Option<LoopError> = None;
196        let mut any_success = false;
197
198        for keyword in &self.keywords {
199            if let Some(max) = limit {
200                if total_processed >= max {
201                    break;
202                }
203            }
204
205            let remaining = limit.map(|max| max.saturating_sub(total_processed));
206            match self.search_and_process(keyword, remaining).await {
207                Ok((results, iter_summary)) => {
208                    any_success = true;
209                    summary.tweets_found += iter_summary.tweets_found;
210                    summary.qualifying += iter_summary.qualifying;
211                    summary.replied += iter_summary.replied;
212                    summary.skipped += iter_summary.skipped;
213                    summary.failed += iter_summary.failed;
214                    total_processed += iter_summary.tweets_found;
215                    all_results.extend(results);
216                }
217                Err(e) => {
218                    tracing::warn!(keyword = %keyword, error = %e, "Search failed for keyword");
219                    last_error = Some(e);
220                }
221            }
222        }
223
224        // If every keyword failed, surface the last error instead of
225        // reporting a misleading empty success.
226        if !any_success {
227            if let Some(err) = last_error {
228                return Err(err);
229            }
230        }
231
232        Ok((all_results, summary))
233    }
234
235    /// Search for a single keyword and process all results.
236    pub(crate) async fn search_and_process(
237        &self,
238        keyword: &str,
239        limit: Option<usize>,
240    ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
241        tracing::info!(keyword = %keyword, "Searching keyword");
242        let tweets = self.searcher.search_tweets(keyword).await?;
243
244        let mut summary = DiscoverySummary {
245            tweets_found: tweets.len(),
246            ..Default::default()
247        };
248
249        let to_process = match limit {
250            Some(n) => &tweets[..tweets.len().min(n)],
251            None => &tweets,
252        };
253
254        let mut results = Vec::with_capacity(to_process.len());
255
256        for tweet in to_process {
257            let result = self.process_tweet(tweet, keyword).await;
258
259            match &result {
260                DiscoveryResult::Replied { .. } => {
261                    summary.qualifying += 1;
262                    summary.replied += 1;
263                }
264                DiscoveryResult::BelowThreshold { .. } => {
265                    summary.skipped += 1;
266                }
267                DiscoveryResult::Skipped { .. } => {
268                    summary.skipped += 1;
269                }
270                DiscoveryResult::Failed { .. } => {
271                    summary.failed += 1;
272                }
273            }
274
275            results.push(result);
276        }
277
278        Ok((results, summary))
279    }
280
281    /// Process a single discovered tweet: dedup, score, generate reply, post.
282    pub(crate) async fn process_tweet(&self, tweet: &LoopTweet, keyword: &str) -> DiscoveryResult {
283        // Check if already discovered (dedup)
284        match self.storage.tweet_exists(&tweet.id).await {
285            Ok(true) => {
286                tracing::debug!(tweet_id = %tweet.id, "Tweet already discovered, skipping");
287                return DiscoveryResult::Skipped {
288                    tweet_id: tweet.id.clone(),
289                    reason: "already discovered".to_string(),
290                };
291            }
292            Ok(false) => {}
293            Err(e) => {
294                tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check tweet existence");
295                // Continue anyway -- best effort dedup
296            }
297        }
298
299        // Score the tweet
300        let score_result = self.scorer.score(tweet);
301
302        // Store discovered tweet (even if below threshold, useful for analytics)
303        if let Err(e) = self
304            .storage
305            .store_discovered_tweet(tweet, score_result.total, keyword)
306            .await
307        {
308            tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to store discovered tweet");
309        }
310
311        // Check threshold
312        if !score_result.meets_threshold {
313            tracing::debug!(
314                tweet_id = %tweet.id,
315                score = score_result.total,
316                threshold = self.threshold,
317                "Tweet scored below threshold, skipping"
318            );
319            return DiscoveryResult::BelowThreshold {
320                tweet_id: tweet.id.clone(),
321                score: score_result.total,
322            };
323        }
324
325        // Safety checks
326        if self.safety.has_replied_to(&tweet.id).await {
327            return DiscoveryResult::Skipped {
328                tweet_id: tweet.id.clone(),
329                reason: "already replied".to_string(),
330            };
331        }
332
333        if !self.safety.can_reply().await {
334            return DiscoveryResult::Skipped {
335                tweet_id: tweet.id.clone(),
336                reason: "rate limited".to_string(),
337            };
338        }
339
340        // Generate reply with vault context (product mention always on for discovery)
341        let reply_output = match self
342            .generator
343            .generate_reply_with_rag(&tweet.text, &tweet.author_username, true)
344            .await
345        {
346            Ok(output) => output,
347            Err(e) => {
348                tracing::error!(
349                    tweet_id = %tweet.id,
350                    error = %e,
351                    "Failed to generate reply"
352                );
353                return DiscoveryResult::Failed {
354                    tweet_id: tweet.id.clone(),
355                    error: e.to_string(),
356                };
357            }
358        };
359        let reply_text = reply_output.text;
360
361        tracing::info!(
362            author = %tweet.author_username,
363            score = format!("{:.0}", score_result.total),
364            "Posted reply to @{}",
365            tweet.author_username,
366        );
367
368        if self.dry_run {
369            tracing::info!(
370                "DRY RUN: Tweet {} by @{} scored {:.0}/100 -- Would reply: \"{}\"",
371                tweet.id,
372                tweet.author_username,
373                score_result.total,
374                reply_text
375            );
376
377            let _ = self
378                .storage
379                .log_action(
380                    "discovery_reply",
381                    "dry_run",
382                    &format!(
383                        "Score {:.0}, reply to @{}: {}",
384                        score_result.total,
385                        tweet.author_username,
386                        truncate(&reply_text, 50)
387                    ),
388                )
389                .await;
390        } else {
391            if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
392                tracing::error!(tweet_id = %tweet.id, error = %e, "Failed to send reply");
393                return DiscoveryResult::Failed {
394                    tweet_id: tweet.id.clone(),
395                    error: e.to_string(),
396                };
397            }
398
399            if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
400                tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
401            }
402
403            let _ = self
404                .storage
405                .log_action(
406                    "discovery_reply",
407                    "success",
408                    &format!(
409                        "Score {:.0}, replied to @{}: {}",
410                        score_result.total,
411                        tweet.author_username,
412                        truncate(&reply_text, 50)
413                    ),
414                )
415                .await;
416        }
417
418        DiscoveryResult::Replied {
419            tweet_id: tweet.id.clone(),
420            author: tweet.author_username.clone(),
421            score: score_result.total,
422            reply_text,
423        }
424    }
425}
426
427/// Truncate a string for display.
428pub(crate) fn truncate(s: &str, max_len: usize) -> String {
429    if s.len() <= max_len {
430        s.to_string()
431    } else {
432        format!("{}...", &s[..max_len])
433    }
434}
435
436#[cfg(test)]
437mod tests;