1use super::loop_helpers::{
10 ConsecutiveErrorTracker, LoopError, LoopTweet, PostSender, ReplyGenerator, SafetyChecker,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18#[async_trait::async_trait]
24pub trait TargetTweetFetcher: Send + Sync {
25 async fn fetch_user_tweets(&self, user_id: &str) -> Result<Vec<LoopTweet>, LoopError>;
27}
28
29#[async_trait::async_trait]
31pub trait TargetUserManager: Send + Sync {
32 async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError>;
34}
35
36#[allow(clippy::too_many_arguments)]
38#[async_trait::async_trait]
39pub trait TargetStorage: Send + Sync {
40 async fn upsert_target_account(
42 &self,
43 account_id: &str,
44 username: &str,
45 ) -> Result<(), LoopError>;
46
47 async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError>;
49
50 async fn store_target_tweet(
52 &self,
53 tweet_id: &str,
54 account_id: &str,
55 content: &str,
56 created_at: &str,
57 reply_count: i64,
58 like_count: i64,
59 relevance_score: f64,
60 ) -> Result<(), LoopError>;
61
62 async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError>;
64
65 async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError>;
67
68 async fn count_target_replies_today(&self) -> Result<i64, LoopError>;
70
71 async fn log_action(
73 &self,
74 action_type: &str,
75 status: &str,
76 message: &str,
77 ) -> Result<(), LoopError>;
78}
79
80#[derive(Debug, Clone)]
86pub struct TargetLoopConfig {
87 pub accounts: Vec<String>,
89 pub max_target_replies_per_day: u32,
91 pub dry_run: bool,
93}
94
95#[derive(Debug)]
101pub enum TargetResult {
102 Replied {
104 tweet_id: String,
105 account: String,
106 reply_text: String,
107 },
108 Skipped { tweet_id: String, reason: String },
110 Failed { tweet_id: String, error: String },
112}
113
114pub struct TargetLoop {
120 fetcher: Arc<dyn TargetTweetFetcher>,
121 user_mgr: Arc<dyn TargetUserManager>,
122 generator: Arc<dyn ReplyGenerator>,
123 safety: Arc<dyn SafetyChecker>,
124 storage: Arc<dyn TargetStorage>,
125 poster: Arc<dyn PostSender>,
126 config: TargetLoopConfig,
127}
128
129impl TargetLoop {
130 #[allow(clippy::too_many_arguments)]
132 pub fn new(
133 fetcher: Arc<dyn TargetTweetFetcher>,
134 user_mgr: Arc<dyn TargetUserManager>,
135 generator: Arc<dyn ReplyGenerator>,
136 safety: Arc<dyn SafetyChecker>,
137 storage: Arc<dyn TargetStorage>,
138 poster: Arc<dyn PostSender>,
139 config: TargetLoopConfig,
140 ) -> Self {
141 Self {
142 fetcher,
143 user_mgr,
144 generator,
145 safety,
146 storage,
147 poster,
148 config,
149 }
150 }
151
152 pub async fn run(
154 &self,
155 cancel: CancellationToken,
156 scheduler: LoopScheduler,
157 schedule: Option<Arc<ActiveSchedule>>,
158 ) {
159 tracing::info!(
160 dry_run = self.config.dry_run,
161 accounts = self.config.accounts.len(),
162 max_replies = self.config.max_target_replies_per_day,
163 "Target monitoring loop started"
164 );
165
166 if self.config.accounts.is_empty() {
167 tracing::info!("No target accounts configured, target loop has nothing to do");
168 cancel.cancelled().await;
169 return;
170 }
171
172 let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
173
174 loop {
175 if cancel.is_cancelled() {
176 break;
177 }
178
179 if !schedule_gate(&schedule, &cancel).await {
180 break;
181 }
182
183 match self.run_iteration().await {
184 Ok(results) => {
185 error_tracker.record_success();
186 let replied = results
187 .iter()
188 .filter(|r| matches!(r, TargetResult::Replied { .. }))
189 .count();
190 let skipped = results
191 .iter()
192 .filter(|r| matches!(r, TargetResult::Skipped { .. }))
193 .count();
194 if !results.is_empty() {
195 tracing::info!(
196 total = results.len(),
197 replied = replied,
198 skipped = skipped,
199 "Target iteration complete"
200 );
201 }
202 }
203 Err(e) => {
204 let should_pause = error_tracker.record_error();
205 tracing::warn!(
206 error = %e,
207 consecutive_errors = error_tracker.count(),
208 "Target iteration failed"
209 );
210
211 if should_pause {
212 tracing::warn!(
213 pause_secs = error_tracker.pause_duration().as_secs(),
214 "Pausing target loop due to consecutive errors"
215 );
216 tokio::select! {
217 _ = cancel.cancelled() => break,
218 _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
219 }
220 error_tracker.reset();
221 continue;
222 }
223 }
224 }
225
226 tokio::select! {
227 _ = cancel.cancelled() => break,
228 _ = scheduler.tick() => {},
229 }
230 }
231
232 tracing::info!("Target monitoring loop stopped");
233 }
234
235 pub async fn run_iteration(&self) -> Result<Vec<TargetResult>, LoopError> {
237 let mut all_results = Vec::new();
238
239 let replies_today = self.storage.count_target_replies_today().await?;
241 if replies_today >= self.config.max_target_replies_per_day as i64 {
242 tracing::debug!(
243 replies_today = replies_today,
244 limit = self.config.max_target_replies_per_day,
245 "Target reply daily limit reached"
246 );
247 return Ok(all_results);
248 }
249
250 let mut remaining_replies =
251 (self.config.max_target_replies_per_day as i64 - replies_today) as usize;
252
253 for username in &self.config.accounts {
254 if remaining_replies == 0 {
255 break;
256 }
257
258 match self.process_account(username, remaining_replies).await {
259 Ok(results) => {
260 let replied_count = results
261 .iter()
262 .filter(|r| matches!(r, TargetResult::Replied { .. }))
263 .count();
264 remaining_replies = remaining_replies.saturating_sub(replied_count);
265 all_results.extend(results);
266 }
267 Err(e) => {
268 if matches!(e, LoopError::AuthExpired) {
271 tracing::error!(
272 username = %username,
273 "X API authentication expired, re-authenticate with `tuitbot init`"
274 );
275 return Err(e);
276 }
277
278 tracing::warn!(
279 username = %username,
280 error = %e,
281 "Failed to process target account"
282 );
283 }
284 }
285 }
286
287 Ok(all_results)
288 }
289
290 async fn process_account(
292 &self,
293 username: &str,
294 max_replies: usize,
295 ) -> Result<Vec<TargetResult>, LoopError> {
296 let (user_id, resolved_username) = self.user_mgr.lookup_user(username).await?;
298
299 self.storage
301 .upsert_target_account(&user_id, &resolved_username)
302 .await?;
303
304 let tweets = self.fetcher.fetch_user_tweets(&user_id).await?;
306 tracing::info!(
307 username = %resolved_username,
308 count = tweets.len(),
309 "Monitoring @{}, found {} new tweets",
310 resolved_username,
311 tweets.len(),
312 );
313
314 let mut results = Vec::new();
315
316 for tweet in tweets.iter().take(max_replies) {
317 let result = self
318 .process_target_tweet(tweet, &user_id, &resolved_username)
319 .await;
320 if matches!(result, TargetResult::Replied { .. }) {
321 results.push(result);
322 break;
324 }
325 results.push(result);
326 }
327
328 Ok(results)
329 }
330
331 async fn process_target_tweet(
333 &self,
334 tweet: &LoopTweet,
335 account_id: &str,
336 username: &str,
337 ) -> TargetResult {
338 match self.storage.target_tweet_exists(&tweet.id).await {
340 Ok(true) => {
341 return TargetResult::Skipped {
342 tweet_id: tweet.id.clone(),
343 reason: "already discovered".to_string(),
344 };
345 }
346 Ok(false) => {}
347 Err(e) => {
348 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check target tweet");
349 }
350 }
351
352 let _ = self
354 .storage
355 .store_target_tweet(
356 &tweet.id,
357 account_id,
358 &tweet.text,
359 &tweet.created_at,
360 tweet.replies as i64,
361 tweet.likes as i64,
362 0.0,
363 )
364 .await;
365
366 if self.safety.has_replied_to(&tweet.id).await {
368 return TargetResult::Skipped {
369 tweet_id: tweet.id.clone(),
370 reason: "already replied".to_string(),
371 };
372 }
373
374 if !self.safety.can_reply().await {
375 return TargetResult::Skipped {
376 tweet_id: tweet.id.clone(),
377 reason: "rate limited".to_string(),
378 };
379 }
380
381 let reply_text = match self
383 .generator
384 .generate_reply(&tweet.text, username, false)
385 .await
386 {
387 Ok(text) => text,
388 Err(e) => {
389 return TargetResult::Failed {
390 tweet_id: tweet.id.clone(),
391 error: e.to_string(),
392 };
393 }
394 };
395
396 tracing::info!(
397 username = %username,
398 "Replied to target @{}",
399 username,
400 );
401
402 if self.config.dry_run {
403 tracing::info!(
404 "DRY RUN: Target @{} tweet {} -- Would reply: \"{}\"",
405 username,
406 tweet.id,
407 reply_text
408 );
409
410 let _ = self
411 .storage
412 .log_action(
413 "target_reply",
414 "dry_run",
415 &format!("Reply to @{username}: {}", truncate(&reply_text, 50)),
416 )
417 .await;
418 } else {
419 if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
420 return TargetResult::Failed {
421 tweet_id: tweet.id.clone(),
422 error: e.to_string(),
423 };
424 }
425
426 if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
427 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
428 }
429
430 let _ = self.storage.mark_target_tweet_replied(&tweet.id).await;
432 let _ = self.storage.record_target_reply(account_id).await;
433
434 let _ = self
435 .storage
436 .log_action(
437 "target_reply",
438 "success",
439 &format!("Replied to @{username}: {}", truncate(&reply_text, 50)),
440 )
441 .await;
442 }
443
444 TargetResult::Replied {
445 tweet_id: tweet.id.clone(),
446 account: username.to_string(),
447 reply_text,
448 }
449 }
450}
451
452fn truncate(s: &str, max_len: usize) -> String {
454 if s.len() <= max_len {
455 s.to_string()
456 } else {
457 format!("{}...", &s[..max_len])
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use std::sync::atomic::{AtomicU32, Ordering};
465 use std::sync::Mutex;
466
467 struct MockFetcher {
470 tweets: Vec<LoopTweet>,
471 }
472
473 #[async_trait::async_trait]
474 impl TargetTweetFetcher for MockFetcher {
475 async fn fetch_user_tweets(&self, _user_id: &str) -> Result<Vec<LoopTweet>, LoopError> {
476 Ok(self.tweets.clone())
477 }
478 }
479
480 struct MockUserManager {
481 users: Vec<(String, String, String)>, }
483
484 #[async_trait::async_trait]
485 impl TargetUserManager for MockUserManager {
486 async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError> {
487 for (uname, uid, resolved) in &self.users {
488 if uname == username {
489 return Ok((uid.clone(), resolved.clone()));
490 }
491 }
492 Err(LoopError::Other(format!("user not found: {username}")))
493 }
494 }
495
496 struct MockGenerator {
497 reply: String,
498 }
499
500 #[async_trait::async_trait]
501 impl ReplyGenerator for MockGenerator {
502 async fn generate_reply(
503 &self,
504 _tweet_text: &str,
505 _author: &str,
506 _mention_product: bool,
507 ) -> Result<String, LoopError> {
508 Ok(self.reply.clone())
509 }
510 }
511
512 struct MockSafety {
513 can_reply: bool,
514 replied_ids: Mutex<Vec<String>>,
515 }
516
517 impl MockSafety {
518 fn new(can_reply: bool) -> Self {
519 Self {
520 can_reply,
521 replied_ids: Mutex::new(Vec::new()),
522 }
523 }
524 }
525
526 #[async_trait::async_trait]
527 impl SafetyChecker for MockSafety {
528 async fn can_reply(&self) -> bool {
529 self.can_reply
530 }
531 async fn has_replied_to(&self, tweet_id: &str) -> bool {
532 self.replied_ids
533 .lock()
534 .expect("lock")
535 .contains(&tweet_id.to_string())
536 }
537 async fn record_reply(&self, tweet_id: &str, _content: &str) -> Result<(), LoopError> {
538 self.replied_ids
539 .lock()
540 .expect("lock")
541 .push(tweet_id.to_string());
542 Ok(())
543 }
544 }
545
546 struct MockTargetStorage {
547 existing_tweets: Mutex<Vec<String>>,
548 replies_today: Mutex<i64>,
549 }
550
551 impl MockTargetStorage {
552 fn new() -> Self {
553 Self {
554 existing_tweets: Mutex::new(Vec::new()),
555 replies_today: Mutex::new(0),
556 }
557 }
558 }
559
560 #[async_trait::async_trait]
561 impl TargetStorage for MockTargetStorage {
562 async fn upsert_target_account(
563 &self,
564 _account_id: &str,
565 _username: &str,
566 ) -> Result<(), LoopError> {
567 Ok(())
568 }
569 async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError> {
570 Ok(self
571 .existing_tweets
572 .lock()
573 .expect("lock")
574 .contains(&tweet_id.to_string()))
575 }
576 async fn store_target_tweet(
577 &self,
578 _tweet_id: &str,
579 _account_id: &str,
580 _content: &str,
581 _created_at: &str,
582 _reply_count: i64,
583 _like_count: i64,
584 _relevance_score: f64,
585 ) -> Result<(), LoopError> {
586 Ok(())
587 }
588 async fn mark_target_tweet_replied(&self, _tweet_id: &str) -> Result<(), LoopError> {
589 Ok(())
590 }
591 async fn record_target_reply(&self, _account_id: &str) -> Result<(), LoopError> {
592 *self.replies_today.lock().expect("lock") += 1;
593 Ok(())
594 }
595 async fn count_target_replies_today(&self) -> Result<i64, LoopError> {
596 Ok(*self.replies_today.lock().expect("lock"))
597 }
598 async fn log_action(
599 &self,
600 _action_type: &str,
601 _status: &str,
602 _message: &str,
603 ) -> Result<(), LoopError> {
604 Ok(())
605 }
606 }
607
608 struct MockPoster {
609 sent: Mutex<Vec<(String, String)>>,
610 }
611
612 impl MockPoster {
613 fn new() -> Self {
614 Self {
615 sent: Mutex::new(Vec::new()),
616 }
617 }
618 fn sent_count(&self) -> usize {
619 self.sent.lock().expect("lock").len()
620 }
621 }
622
623 #[async_trait::async_trait]
624 impl PostSender for MockPoster {
625 async fn send_reply(&self, tweet_id: &str, content: &str) -> Result<(), LoopError> {
626 self.sent
627 .lock()
628 .expect("lock")
629 .push((tweet_id.to_string(), content.to_string()));
630 Ok(())
631 }
632 }
633
634 fn test_tweet(id: &str, author: &str) -> LoopTweet {
635 LoopTweet {
636 id: id.to_string(),
637 text: format!("Interesting thoughts on tech from @{author}"),
638 author_id: format!("uid_{author}"),
639 author_username: author.to_string(),
640 author_followers: 5000,
641 created_at: "2026-01-01T00:00:00Z".to_string(),
642 likes: 10,
643 retweets: 2,
644 replies: 1,
645 }
646 }
647
648 fn default_config() -> TargetLoopConfig {
649 TargetLoopConfig {
650 accounts: vec!["alice".to_string()],
651 max_target_replies_per_day: 3,
652 dry_run: false,
653 }
654 }
655
656 fn build_loop(
657 tweets: Vec<LoopTweet>,
658 config: TargetLoopConfig,
659 storage: Arc<MockTargetStorage>,
660 ) -> (TargetLoop, Arc<MockPoster>) {
661 let poster = Arc::new(MockPoster::new());
662 let user_mgr = Arc::new(MockUserManager {
663 users: vec![(
664 "alice".to_string(),
665 "uid_alice".to_string(),
666 "alice".to_string(),
667 )],
668 });
669 let target_loop = TargetLoop::new(
670 Arc::new(MockFetcher { tweets }),
671 user_mgr,
672 Arc::new(MockGenerator {
673 reply: "Great point!".to_string(),
674 }),
675 Arc::new(MockSafety::new(true)),
676 storage,
677 poster.clone(),
678 config,
679 );
680 (target_loop, poster)
681 }
682
683 #[tokio::test]
686 async fn empty_accounts_does_nothing() {
687 let storage = Arc::new(MockTargetStorage::new());
688 let mut config = default_config();
689 config.accounts = Vec::new();
690 let (target_loop, poster) = build_loop(Vec::new(), config, storage);
691
692 let results = target_loop.run_iteration().await.expect("iteration");
693 assert!(results.is_empty());
694 assert_eq!(poster.sent_count(), 0);
695 }
696
697 #[tokio::test]
698 async fn replies_to_target_tweet() {
699 let tweets = vec![test_tweet("tw1", "alice")];
700 let storage = Arc::new(MockTargetStorage::new());
701 let (target_loop, poster) = build_loop(tweets, default_config(), storage);
702
703 let results = target_loop.run_iteration().await.expect("iteration");
704 assert_eq!(results.len(), 1);
705 assert!(matches!(results[0], TargetResult::Replied { .. }));
706 assert_eq!(poster.sent_count(), 1);
707 }
708
709 #[tokio::test]
710 async fn skips_existing_target_tweet() {
711 let tweets = vec![test_tweet("tw1", "alice")];
712 let storage = Arc::new(MockTargetStorage::new());
713 storage
714 .existing_tweets
715 .lock()
716 .expect("lock")
717 .push("tw1".to_string());
718 let (target_loop, poster) = build_loop(tweets, default_config(), storage);
719
720 let results = target_loop.run_iteration().await.expect("iteration");
721 assert_eq!(results.len(), 1);
722 assert!(matches!(results[0], TargetResult::Skipped { .. }));
723 assert_eq!(poster.sent_count(), 0);
724 }
725
726 #[tokio::test]
727 async fn respects_daily_limit() {
728 let tweets = vec![test_tweet("tw1", "alice")];
729 let storage = Arc::new(MockTargetStorage::new());
730 *storage.replies_today.lock().expect("lock") = 3;
731 let (target_loop, poster) = build_loop(tweets, default_config(), storage);
732
733 let results = target_loop.run_iteration().await.expect("iteration");
734 assert!(results.is_empty());
735 assert_eq!(poster.sent_count(), 0);
736 }
737
738 #[tokio::test]
739 async fn dry_run_does_not_post() {
740 let tweets = vec![test_tweet("tw1", "alice")];
741 let storage = Arc::new(MockTargetStorage::new());
742 let mut config = default_config();
743 config.dry_run = true;
744 let (target_loop, poster) = build_loop(tweets, config, storage);
745
746 let results = target_loop.run_iteration().await.expect("iteration");
747 assert_eq!(results.len(), 1);
748 assert!(matches!(results[0], TargetResult::Replied { .. }));
749 assert_eq!(poster.sent_count(), 0);
750 }
751
752 #[test]
753 fn truncate_short_string() {
754 assert_eq!(truncate("hello", 10), "hello");
755 }
756
757 #[test]
758 fn truncate_long_string() {
759 assert_eq!(truncate("hello world", 5), "hello...");
760 }
761
762 struct MockAuthExpiredUserManager {
765 lookup_count: AtomicU32,
766 error: LoopError,
767 }
768
769 impl MockAuthExpiredUserManager {
770 fn auth_expired() -> Self {
771 Self {
772 lookup_count: AtomicU32::new(0),
773 error: LoopError::AuthExpired,
774 }
775 }
776 }
777
778 #[async_trait::async_trait]
779 impl TargetUserManager for MockAuthExpiredUserManager {
780 async fn lookup_user(&self, _username: &str) -> Result<(String, String), LoopError> {
781 self.lookup_count.fetch_add(1, Ordering::SeqCst);
782 Err(match &self.error {
783 LoopError::AuthExpired => LoopError::AuthExpired,
784 LoopError::Other(msg) => LoopError::Other(msg.clone()),
785 _ => unreachable!(),
786 })
787 }
788 }
789
790 struct MockPartialFailUserManager {
792 call_count: AtomicU32,
793 }
794
795 #[async_trait::async_trait]
796 impl TargetUserManager for MockPartialFailUserManager {
797 async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError> {
798 let n = self.call_count.fetch_add(1, Ordering::SeqCst);
799 if n == 0 {
800 Err(LoopError::Other("transient failure".to_string()))
801 } else {
802 Ok((format!("uid_{username}"), username.to_string()))
803 }
804 }
805 }
806
807 #[tokio::test]
808 async fn auth_expired_stops_iteration() {
809 let user_mgr = Arc::new(MockAuthExpiredUserManager::auth_expired());
810 let storage = Arc::new(MockTargetStorage::new());
811 let poster = Arc::new(MockPoster::new());
812
813 let mut config = default_config();
814 config.accounts = vec![
815 "alice".to_string(),
816 "bob".to_string(),
817 "charlie".to_string(),
818 ];
819
820 let target_loop = TargetLoop::new(
821 Arc::new(MockFetcher { tweets: vec![] }),
822 user_mgr.clone(),
823 Arc::new(MockGenerator {
824 reply: "Great!".to_string(),
825 }),
826 Arc::new(MockSafety::new(true)),
827 storage,
828 poster,
829 config,
830 );
831
832 let result = target_loop.run_iteration().await;
833 assert!(result.is_err());
834 assert!(matches!(result.unwrap_err(), LoopError::AuthExpired));
835 assert_eq!(user_mgr.lookup_count.load(Ordering::SeqCst), 1);
837 }
838
839 #[tokio::test]
840 async fn non_auth_error_continues_iteration() {
841 let user_mgr = Arc::new(MockPartialFailUserManager {
842 call_count: AtomicU32::new(0),
843 });
844 let storage = Arc::new(MockTargetStorage::new());
845 let poster = Arc::new(MockPoster::new());
846
847 let mut config = default_config();
848 config.accounts = vec!["alice".to_string(), "bob".to_string()];
849
850 let target_loop = TargetLoop::new(
851 Arc::new(MockFetcher {
852 tweets: vec![test_tweet("tw1", "bob")],
853 }),
854 user_mgr.clone(),
855 Arc::new(MockGenerator {
856 reply: "Nice!".to_string(),
857 }),
858 Arc::new(MockSafety::new(true)),
859 storage,
860 poster.clone(),
861 config,
862 );
863
864 let results = target_loop.run_iteration().await.expect("should succeed");
865 assert_eq!(user_mgr.call_count.load(Ordering::SeqCst), 2);
867 assert_eq!(results.len(), 1);
869 assert!(matches!(results[0], TargetResult::Replied { .. }));
870 assert_eq!(poster.sent_count(), 1);
871 }
872}