1use super::loop_helpers::{
9 ConsecutiveErrorTracker, LoopError, LoopStorage, LoopTweet, PostSender, ReplyGenerator,
10 SafetyChecker, TweetScorer, TweetSearcher,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18pub struct DiscoveryLoop {
20 searcher: Arc<dyn TweetSearcher>,
21 scorer: Arc<dyn TweetScorer>,
22 generator: Arc<dyn ReplyGenerator>,
23 safety: Arc<dyn SafetyChecker>,
24 storage: Arc<dyn LoopStorage>,
25 poster: Arc<dyn PostSender>,
26 keywords: Vec<String>,
27 threshold: f32,
28 dry_run: bool,
29}
30
31#[derive(Debug)]
33pub enum DiscoveryResult {
34 Replied {
36 tweet_id: String,
37 author: String,
38 score: f32,
39 reply_text: String,
40 },
41 BelowThreshold { tweet_id: String, score: f32 },
43 Skipped { tweet_id: String, reason: String },
45 Failed { tweet_id: String, error: String },
47}
48
49#[derive(Debug, Default)]
51pub struct DiscoverySummary {
52 pub tweets_found: usize,
54 pub qualifying: usize,
56 pub replied: usize,
58 pub skipped: usize,
60 pub failed: usize,
62}
63
64impl DiscoveryLoop {
65 #[allow(clippy::too_many_arguments)]
67 pub fn new(
68 searcher: Arc<dyn TweetSearcher>,
69 scorer: Arc<dyn TweetScorer>,
70 generator: Arc<dyn ReplyGenerator>,
71 safety: Arc<dyn SafetyChecker>,
72 storage: Arc<dyn LoopStorage>,
73 poster: Arc<dyn PostSender>,
74 keywords: Vec<String>,
75 threshold: f32,
76 dry_run: bool,
77 ) -> Self {
78 Self {
79 searcher,
80 scorer,
81 generator,
82 safety,
83 storage,
84 poster,
85 keywords,
86 threshold,
87 dry_run,
88 }
89 }
90
91 pub async fn run(
95 &self,
96 cancel: CancellationToken,
97 scheduler: LoopScheduler,
98 schedule: Option<Arc<ActiveSchedule>>,
99 ) {
100 tracing::info!(
101 dry_run = self.dry_run,
102 keywords = self.keywords.len(),
103 threshold = self.threshold,
104 "Discovery loop started"
105 );
106
107 if self.keywords.is_empty() {
108 tracing::warn!("No keywords configured, discovery loop has nothing to search");
109 cancel.cancelled().await;
110 return;
111 }
112
113 let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
114 let mut keyword_index = 0usize;
115
116 loop {
117 if cancel.is_cancelled() {
118 break;
119 }
120
121 if !schedule_gate(&schedule, &cancel).await {
122 break;
123 }
124
125 let keyword = &self.keywords[keyword_index % self.keywords.len()];
127 keyword_index += 1;
128
129 match self.search_and_process(keyword, None).await {
130 Ok((_results, summary)) => {
131 error_tracker.record_success();
132 if summary.tweets_found > 0 {
133 tracing::info!(
134 keyword = %keyword,
135 found = summary.tweets_found,
136 qualifying = summary.qualifying,
137 replied = summary.replied,
138 "Discovery iteration complete"
139 );
140 }
141 }
142 Err(e) => {
143 let should_pause = error_tracker.record_error();
144 tracing::warn!(
145 keyword = %keyword,
146 error = %e,
147 consecutive_errors = error_tracker.count(),
148 "Discovery iteration failed"
149 );
150
151 if should_pause {
152 tracing::warn!(
153 pause_secs = error_tracker.pause_duration().as_secs(),
154 "Pausing discovery loop due to consecutive errors"
155 );
156 tokio::select! {
157 _ = cancel.cancelled() => break,
158 _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
159 }
160 error_tracker.reset();
161 continue;
162 }
163
164 if let LoopError::RateLimited { retry_after } = &e {
165 let backoff = super::loop_helpers::rate_limit_backoff(*retry_after, 0);
166 tokio::select! {
167 _ = cancel.cancelled() => break,
168 _ = tokio::time::sleep(backoff) => {},
169 }
170 continue;
171 }
172 }
173 }
174
175 tokio::select! {
176 _ = cancel.cancelled() => break,
177 _ = scheduler.tick() => {},
178 }
179 }
180
181 tracing::info!("Discovery loop stopped");
182 }
183
184 pub async fn run_once(
189 &self,
190 limit: Option<usize>,
191 ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
192 let mut all_results = Vec::new();
193 let mut summary = DiscoverySummary::default();
194 let mut total_processed = 0usize;
195
196 for keyword in &self.keywords {
197 if let Some(max) = limit {
198 if total_processed >= max {
199 break;
200 }
201 }
202
203 let remaining = limit.map(|max| max.saturating_sub(total_processed));
204 match self.search_and_process(keyword, remaining).await {
205 Ok((results, iter_summary)) => {
206 summary.tweets_found += iter_summary.tweets_found;
207 summary.qualifying += iter_summary.qualifying;
208 summary.replied += iter_summary.replied;
209 summary.skipped += iter_summary.skipped;
210 summary.failed += iter_summary.failed;
211 total_processed += iter_summary.tweets_found;
212 all_results.extend(results);
213 }
214 Err(e) => {
215 tracing::warn!(keyword = %keyword, error = %e, "Search failed for keyword");
216 }
217 }
218 }
219
220 Ok((all_results, summary))
221 }
222
223 async fn search_and_process(
225 &self,
226 keyword: &str,
227 limit: Option<usize>,
228 ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
229 tracing::info!(keyword = %keyword, "Searching keyword");
230 let tweets = self.searcher.search_tweets(keyword).await?;
231
232 let mut summary = DiscoverySummary {
233 tweets_found: tweets.len(),
234 ..Default::default()
235 };
236
237 let to_process = match limit {
238 Some(n) => &tweets[..tweets.len().min(n)],
239 None => &tweets,
240 };
241
242 let mut results = Vec::with_capacity(to_process.len());
243
244 for tweet in to_process {
245 let result = self.process_tweet(tweet, keyword).await;
246
247 match &result {
248 DiscoveryResult::Replied { .. } => {
249 summary.qualifying += 1;
250 summary.replied += 1;
251 }
252 DiscoveryResult::BelowThreshold { .. } => {
253 summary.skipped += 1;
254 }
255 DiscoveryResult::Skipped { .. } => {
256 summary.skipped += 1;
257 }
258 DiscoveryResult::Failed { .. } => {
259 summary.failed += 1;
260 }
261 }
262
263 results.push(result);
264 }
265
266 Ok((results, summary))
267 }
268
269 async fn process_tweet(&self, tweet: &LoopTweet, keyword: &str) -> DiscoveryResult {
271 match self.storage.tweet_exists(&tweet.id).await {
273 Ok(true) => {
274 tracing::debug!(tweet_id = %tweet.id, "Tweet already discovered, skipping");
275 return DiscoveryResult::Skipped {
276 tweet_id: tweet.id.clone(),
277 reason: "already discovered".to_string(),
278 };
279 }
280 Ok(false) => {}
281 Err(e) => {
282 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check tweet existence");
283 }
285 }
286
287 let score_result = self.scorer.score(tweet);
289
290 if let Err(e) = self
292 .storage
293 .store_discovered_tweet(tweet, score_result.total, keyword)
294 .await
295 {
296 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to store discovered tweet");
297 }
298
299 if !score_result.meets_threshold {
301 tracing::debug!(
302 tweet_id = %tweet.id,
303 score = score_result.total,
304 threshold = self.threshold,
305 "Tweet scored below threshold, skipping"
306 );
307 return DiscoveryResult::BelowThreshold {
308 tweet_id: tweet.id.clone(),
309 score: score_result.total,
310 };
311 }
312
313 if self.safety.has_replied_to(&tweet.id).await {
315 return DiscoveryResult::Skipped {
316 tweet_id: tweet.id.clone(),
317 reason: "already replied".to_string(),
318 };
319 }
320
321 if !self.safety.can_reply().await {
322 return DiscoveryResult::Skipped {
323 tweet_id: tweet.id.clone(),
324 reason: "rate limited".to_string(),
325 };
326 }
327
328 let reply_text = match self
330 .generator
331 .generate_reply(&tweet.text, &tweet.author_username, true)
332 .await
333 {
334 Ok(text) => text,
335 Err(e) => {
336 tracing::error!(
337 tweet_id = %tweet.id,
338 error = %e,
339 "Failed to generate reply"
340 );
341 return DiscoveryResult::Failed {
342 tweet_id: tweet.id.clone(),
343 error: e.to_string(),
344 };
345 }
346 };
347
348 tracing::info!(
349 author = %tweet.author_username,
350 score = format!("{:.0}", score_result.total),
351 "Posted reply to @{}",
352 tweet.author_username,
353 );
354
355 if self.dry_run {
356 tracing::info!(
357 "DRY RUN: Tweet {} by @{} scored {:.0}/100 -- Would reply: \"{}\"",
358 tweet.id,
359 tweet.author_username,
360 score_result.total,
361 reply_text
362 );
363
364 let _ = self
365 .storage
366 .log_action(
367 "discovery_reply",
368 "dry_run",
369 &format!(
370 "Score {:.0}, reply to @{}: {}",
371 score_result.total,
372 tweet.author_username,
373 truncate(&reply_text, 50)
374 ),
375 )
376 .await;
377 } else {
378 if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
379 tracing::error!(tweet_id = %tweet.id, error = %e, "Failed to send reply");
380 return DiscoveryResult::Failed {
381 tweet_id: tweet.id.clone(),
382 error: e.to_string(),
383 };
384 }
385
386 if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
387 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
388 }
389
390 let _ = self
391 .storage
392 .log_action(
393 "discovery_reply",
394 "success",
395 &format!(
396 "Score {:.0}, replied to @{}: {}",
397 score_result.total,
398 tweet.author_username,
399 truncate(&reply_text, 50)
400 ),
401 )
402 .await;
403 }
404
405 DiscoveryResult::Replied {
406 tweet_id: tweet.id.clone(),
407 author: tweet.author_username.clone(),
408 score: score_result.total,
409 reply_text,
410 }
411 }
412}
413
414fn truncate(s: &str, max_len: usize) -> String {
416 if s.len() <= max_len {
417 s.to_string()
418 } else {
419 format!("{}...", &s[..max_len])
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use crate::automation::ScoreResult;
427 use std::sync::Mutex;
428
429 struct MockSearcher {
432 results: Vec<LoopTweet>,
433 }
434
435 #[async_trait::async_trait]
436 impl TweetSearcher for MockSearcher {
437 async fn search_tweets(&self, _query: &str) -> Result<Vec<LoopTweet>, LoopError> {
438 Ok(self.results.clone())
439 }
440 }
441
442 struct FailingSearcher;
443
444 #[async_trait::async_trait]
445 impl TweetSearcher for FailingSearcher {
446 async fn search_tweets(&self, _query: &str) -> Result<Vec<LoopTweet>, LoopError> {
447 Err(LoopError::RateLimited {
448 retry_after: Some(60),
449 })
450 }
451 }
452
453 struct MockScorer {
454 score: f32,
455 meets_threshold: bool,
456 }
457
458 impl TweetScorer for MockScorer {
459 fn score(&self, _tweet: &LoopTweet) -> ScoreResult {
460 ScoreResult {
461 total: self.score,
462 meets_threshold: self.meets_threshold,
463 matched_keywords: vec!["test".to_string()],
464 }
465 }
466 }
467
468 struct MockGenerator {
469 reply: String,
470 }
471
472 #[async_trait::async_trait]
473 impl ReplyGenerator for MockGenerator {
474 async fn generate_reply(
475 &self,
476 _tweet_text: &str,
477 _author: &str,
478 _mention_product: bool,
479 ) -> Result<String, LoopError> {
480 Ok(self.reply.clone())
481 }
482 }
483
484 struct MockSafety {
485 can_reply: bool,
486 replied_ids: Mutex<Vec<String>>,
487 }
488
489 impl MockSafety {
490 fn new(can_reply: bool) -> Self {
491 Self {
492 can_reply,
493 replied_ids: Mutex::new(Vec::new()),
494 }
495 }
496 }
497
498 #[async_trait::async_trait]
499 impl SafetyChecker for MockSafety {
500 async fn can_reply(&self) -> bool {
501 self.can_reply
502 }
503 async fn has_replied_to(&self, tweet_id: &str) -> bool {
504 self.replied_ids
505 .lock()
506 .expect("lock")
507 .contains(&tweet_id.to_string())
508 }
509 async fn record_reply(&self, tweet_id: &str, _content: &str) -> Result<(), LoopError> {
510 self.replied_ids
511 .lock()
512 .expect("lock")
513 .push(tweet_id.to_string());
514 Ok(())
515 }
516 }
517
518 struct MockStorage {
519 existing_ids: Mutex<Vec<String>>,
520 discovered: Mutex<Vec<String>>,
521 actions: Mutex<Vec<(String, String, String)>>,
522 }
523
524 impl MockStorage {
525 fn new() -> Self {
526 Self {
527 existing_ids: Mutex::new(Vec::new()),
528 discovered: Mutex::new(Vec::new()),
529 actions: Mutex::new(Vec::new()),
530 }
531 }
532 }
533
534 #[async_trait::async_trait]
535 impl LoopStorage for MockStorage {
536 async fn get_cursor(&self, _key: &str) -> Result<Option<String>, LoopError> {
537 Ok(None)
538 }
539 async fn set_cursor(&self, _key: &str, _value: &str) -> Result<(), LoopError> {
540 Ok(())
541 }
542 async fn tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
543 Ok(self
544 .existing_ids
545 .lock()
546 .expect("lock")
547 .contains(&tweet_id.to_string()))
548 }
549 async fn store_discovered_tweet(
550 &self,
551 tweet: &LoopTweet,
552 _score: f32,
553 _keyword: &str,
554 ) -> Result<(), LoopError> {
555 self.discovered.lock().expect("lock").push(tweet.id.clone());
556 Ok(())
557 }
558 async fn log_action(
559 &self,
560 action_type: &str,
561 status: &str,
562 message: &str,
563 ) -> Result<(), LoopError> {
564 self.actions.lock().expect("lock").push((
565 action_type.to_string(),
566 status.to_string(),
567 message.to_string(),
568 ));
569 Ok(())
570 }
571 }
572
573 struct MockPoster {
574 sent: Mutex<Vec<(String, String)>>,
575 }
576
577 impl MockPoster {
578 fn new() -> Self {
579 Self {
580 sent: Mutex::new(Vec::new()),
581 }
582 }
583 fn sent_count(&self) -> usize {
584 self.sent.lock().expect("lock").len()
585 }
586 }
587
588 #[async_trait::async_trait]
589 impl PostSender for MockPoster {
590 async fn send_reply(&self, tweet_id: &str, content: &str) -> Result<(), LoopError> {
591 self.sent
592 .lock()
593 .expect("lock")
594 .push((tweet_id.to_string(), content.to_string()));
595 Ok(())
596 }
597 }
598
599 fn test_tweet(id: &str, author: &str) -> LoopTweet {
600 LoopTweet {
601 id: id.to_string(),
602 text: format!("Test tweet about rust from @{author}"),
603 author_id: format!("uid_{author}"),
604 author_username: author.to_string(),
605 author_followers: 5000,
606 created_at: "2026-01-01T00:00:00Z".to_string(),
607 likes: 20,
608 retweets: 5,
609 replies: 3,
610 }
611 }
612
613 fn build_loop(
614 tweets: Vec<LoopTweet>,
615 score: f32,
616 meets_threshold: bool,
617 dry_run: bool,
618 ) -> (DiscoveryLoop, Arc<MockPoster>, Arc<MockStorage>) {
619 let poster = Arc::new(MockPoster::new());
620 let storage = Arc::new(MockStorage::new());
621 let discovery = DiscoveryLoop::new(
622 Arc::new(MockSearcher { results: tweets }),
623 Arc::new(MockScorer {
624 score,
625 meets_threshold,
626 }),
627 Arc::new(MockGenerator {
628 reply: "Great insight!".to_string(),
629 }),
630 Arc::new(MockSafety::new(true)),
631 storage.clone(),
632 poster.clone(),
633 vec!["rust".to_string(), "cli".to_string()],
634 70.0,
635 dry_run,
636 );
637 (discovery, poster, storage)
638 }
639
640 #[tokio::test]
643 async fn search_and_process_no_results() {
644 let (discovery, poster, _) = build_loop(Vec::new(), 80.0, true, false);
645 let (results, summary) = discovery.search_and_process("rust", None).await.unwrap();
646 assert_eq!(summary.tweets_found, 0);
647 assert!(results.is_empty());
648 assert_eq!(poster.sent_count(), 0);
649 }
650
651 #[tokio::test]
652 async fn search_and_process_above_threshold() {
653 let tweets = vec![test_tweet("100", "alice"), test_tweet("101", "bob")];
654 let (discovery, poster, storage) = build_loop(tweets, 85.0, true, false);
655
656 let (results, summary) = discovery.search_and_process("rust", None).await.unwrap();
657
658 assert_eq!(summary.tweets_found, 2);
659 assert_eq!(summary.replied, 2);
660 assert_eq!(results.len(), 2);
661 assert_eq!(poster.sent_count(), 2);
662
663 let discovered = storage.discovered.lock().expect("lock");
665 assert_eq!(discovered.len(), 2);
666 }
667
668 #[tokio::test]
669 async fn search_and_process_below_threshold() {
670 let tweets = vec![test_tweet("100", "alice")];
671 let (discovery, poster, storage) = build_loop(tweets, 40.0, false, false);
672
673 let (results, summary) = discovery.search_and_process("rust", None).await.unwrap();
674
675 assert_eq!(summary.tweets_found, 1);
676 assert_eq!(summary.skipped, 1);
677 assert_eq!(summary.replied, 0);
678 assert_eq!(results.len(), 1);
679 assert_eq!(poster.sent_count(), 0);
680
681 let discovered = storage.discovered.lock().expect("lock");
683 assert_eq!(discovered.len(), 1);
684 }
685
686 #[tokio::test]
687 async fn search_and_process_dry_run() {
688 let tweets = vec![test_tweet("100", "alice")];
689 let (discovery, poster, _) = build_loop(tweets, 85.0, true, true);
690
691 let (_results, summary) = discovery.search_and_process("rust", None).await.unwrap();
692
693 assert_eq!(summary.replied, 1);
694 assert_eq!(poster.sent_count(), 0);
696 }
697
698 #[tokio::test]
699 async fn search_and_process_skips_existing() {
700 let tweets = vec![test_tweet("100", "alice")];
701 let poster = Arc::new(MockPoster::new());
702 let storage = Arc::new(MockStorage::new());
703 storage
705 .existing_ids
706 .lock()
707 .expect("lock")
708 .push("100".to_string());
709
710 let discovery = DiscoveryLoop::new(
711 Arc::new(MockSearcher { results: tweets }),
712 Arc::new(MockScorer {
713 score: 85.0,
714 meets_threshold: true,
715 }),
716 Arc::new(MockGenerator {
717 reply: "Great!".to_string(),
718 }),
719 Arc::new(MockSafety::new(true)),
720 storage,
721 poster.clone(),
722 vec!["rust".to_string()],
723 70.0,
724 false,
725 );
726
727 let (_results, summary) = discovery.search_and_process("rust", None).await.unwrap();
728 assert_eq!(summary.skipped, 1);
729 assert_eq!(poster.sent_count(), 0);
730 }
731
732 #[tokio::test]
733 async fn search_and_process_respects_limit() {
734 let tweets = vec![
735 test_tweet("100", "alice"),
736 test_tweet("101", "bob"),
737 test_tweet("102", "carol"),
738 ];
739 let (discovery, poster, _) = build_loop(tweets, 85.0, true, false);
740
741 let (results, summary) = discovery.search_and_process("rust", Some(2)).await.unwrap();
742
743 assert_eq!(summary.tweets_found, 3); assert_eq!(results.len(), 2); assert_eq!(poster.sent_count(), 2); }
747
748 #[tokio::test]
749 async fn run_once_searches_all_keywords() {
750 let tweets = vec![test_tweet("100", "alice")];
751 let (discovery, _, _) = build_loop(tweets, 85.0, true, false);
752
753 let (_, summary) = discovery.run_once(None).await.unwrap();
754 assert_eq!(summary.tweets_found, 2); }
757
758 #[tokio::test]
759 async fn search_error_returns_loop_error() {
760 let poster = Arc::new(MockPoster::new());
761 let storage = Arc::new(MockStorage::new());
762 let discovery = DiscoveryLoop::new(
763 Arc::new(FailingSearcher),
764 Arc::new(MockScorer {
765 score: 85.0,
766 meets_threshold: true,
767 }),
768 Arc::new(MockGenerator {
769 reply: "test".to_string(),
770 }),
771 Arc::new(MockSafety::new(true)),
772 storage,
773 poster,
774 vec!["rust".to_string()],
775 70.0,
776 false,
777 );
778
779 let result = discovery.search_and_process("rust", None).await;
780 assert!(result.is_err());
781 }
782}