1use super::loop_helpers::{ContentLoopError, ContentSafety, ContentStorage, ThreadPoster};
8use super::schedule::{apply_slot_jitter, schedule_gate, ActiveSchedule};
9use super::scheduler::LoopScheduler;
10use rand::seq::SliceRandom;
11use rand::SeedableRng;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio_util::sync::CancellationToken;
15
16pub struct ThreadLoop {
18 generator: Arc<dyn ThreadGenerator>,
19 safety: Arc<dyn ContentSafety>,
20 storage: Arc<dyn ContentStorage>,
21 poster: Arc<dyn ThreadPoster>,
22 topics: Vec<String>,
23 thread_interval_secs: u64,
24 dry_run: bool,
25}
26
27#[async_trait::async_trait]
29pub trait ThreadGenerator: Send + Sync {
30 async fn generate_thread(
35 &self,
36 topic: &str,
37 count: Option<usize>,
38 ) -> Result<Vec<String>, ContentLoopError>;
39}
40
41#[derive(Debug)]
43pub enum ThreadResult {
44 Posted {
46 topic: String,
47 tweet_count: usize,
48 thread_id: String,
49 },
50 PartialFailure {
52 topic: String,
53 tweets_posted: usize,
54 total_tweets: usize,
55 error: String,
56 },
57 TooSoon {
59 elapsed_secs: u64,
60 interval_secs: u64,
61 },
62 RateLimited,
64 NoTopics,
66 ValidationFailed { error: String },
68 Failed { error: String },
70}
71
72impl ThreadLoop {
73 #[allow(clippy::too_many_arguments)]
75 pub fn new(
76 generator: Arc<dyn ThreadGenerator>,
77 safety: Arc<dyn ContentSafety>,
78 storage: Arc<dyn ContentStorage>,
79 poster: Arc<dyn ThreadPoster>,
80 topics: Vec<String>,
81 thread_interval_secs: u64,
82 dry_run: bool,
83 ) -> Self {
84 Self {
85 generator,
86 safety,
87 storage,
88 poster,
89 topics,
90 thread_interval_secs,
91 dry_run,
92 }
93 }
94
95 pub async fn run(
97 &self,
98 cancel: CancellationToken,
99 scheduler: LoopScheduler,
100 schedule: Option<Arc<ActiveSchedule>>,
101 ) {
102 let slot_mode = schedule
103 .as_ref()
104 .is_some_and(|s| s.has_thread_preferred_schedule());
105
106 tracing::info!(
107 dry_run = self.dry_run,
108 topics = self.topics.len(),
109 thread_interval_secs = self.thread_interval_secs,
110 slot_mode = slot_mode,
111 "Thread loop started"
112 );
113
114 if self.topics.is_empty() {
115 tracing::warn!("No topics configured, thread loop has nothing to post");
116 cancel.cancelled().await;
117 return;
118 }
119
120 let min_recent = 3usize;
121 let max_recent = (self.topics.len() / 2)
122 .max(min_recent)
123 .min(self.topics.len());
124 let mut recent_topics: Vec<String> = Vec::with_capacity(max_recent);
125 let mut rng = rand::rngs::StdRng::from_entropy();
126
127 loop {
128 if cancel.is_cancelled() {
129 break;
130 }
131
132 if !schedule_gate(&schedule, &cancel).await {
133 break;
134 }
135
136 if slot_mode {
137 let sched = schedule.as_ref().expect("slot_mode requires schedule");
138
139 match sched.next_thread_slot() {
140 Some(wait) => {
141 let jittered_wait = apply_slot_jitter(wait);
142 tracing::info!(
143 wait_secs = jittered_wait.as_secs(),
144 "Thread slot mode: sleeping until preferred thread time"
145 );
146
147 tokio::select! {
148 _ = cancel.cancelled() => break,
149 _ = tokio::time::sleep(jittered_wait) => {},
150 }
151
152 if cancel.is_cancelled() {
153 break;
154 }
155
156 if !self.safety.can_post_thread().await {
158 Self::log_thread_result(&ThreadResult::RateLimited, self.dry_run);
159 continue;
160 }
161
162 let topic = pick_topic(&self.topics, &mut recent_topics, &mut rng);
163 let result = self.generate_and_post(&topic, None).await;
164
165 if matches!(result, ThreadResult::Posted { .. }) {
166 if recent_topics.len() >= max_recent {
167 recent_topics.remove(0);
168 }
169 recent_topics.push(topic);
170 }
171
172 Self::log_thread_result(&result, self.dry_run);
173 }
174 None => {
175 tracing::warn!("Thread slot mode: no next slot found, sleeping 1 hour");
177 tokio::select! {
178 _ = cancel.cancelled() => break,
179 _ = tokio::time::sleep(Duration::from_secs(3600)) => {},
180 }
181 }
182 }
183 } else {
184 let result = self
186 .run_iteration(&mut recent_topics, max_recent, &mut rng)
187 .await;
188 Self::log_thread_result(&result, self.dry_run);
189
190 tokio::select! {
191 _ = cancel.cancelled() => break,
192 _ = scheduler.tick() => {},
193 }
194 }
195 }
196
197 tracing::info!("Thread loop stopped");
198 }
199
200 fn log_thread_result(result: &ThreadResult, dry_run: bool) {
202 match result {
203 ThreadResult::Posted {
204 topic, tweet_count, ..
205 } => {
206 tracing::info!(
207 topic = %topic,
208 tweets = tweet_count,
209 dry_run = dry_run,
210 "Thread iteration: thread posted"
211 );
212 }
213 ThreadResult::PartialFailure {
214 tweets_posted,
215 total_tweets,
216 error,
217 ..
218 } => {
219 tracing::warn!(
220 posted = tweets_posted,
221 total = total_tweets,
222 error = %error,
223 "Thread iteration: partial failure"
224 );
225 }
226 ThreadResult::TooSoon { .. } => {
227 tracing::debug!("Thread iteration: too soon since last thread");
228 }
229 ThreadResult::RateLimited => {
230 tracing::info!("Thread iteration: weekly thread limit reached");
231 }
232 ThreadResult::NoTopics => {
233 tracing::warn!("Thread iteration: no topics available");
234 }
235 ThreadResult::ValidationFailed { error } => {
236 tracing::warn!(error = %error, "Thread iteration: validation failed");
237 }
238 ThreadResult::Failed { error } => {
239 tracing::warn!(error = %error, "Thread iteration: failed");
240 }
241 }
242 }
243
244 pub async fn run_once(&self, topic: Option<&str>, count: Option<usize>) -> ThreadResult {
249 let chosen_topic = match topic {
250 Some(t) => t.to_string(),
251 None => {
252 if self.topics.is_empty() {
253 return ThreadResult::NoTopics;
254 }
255 let mut rng = rand::thread_rng();
256 self.topics
257 .choose(&mut rng)
258 .expect("topics is non-empty")
259 .clone()
260 }
261 };
262
263 let clamped_count = count.map(|c| c.clamp(2, 15));
265
266 if !self.safety.can_post_thread().await {
268 return ThreadResult::RateLimited;
269 }
270
271 self.generate_and_post(&chosen_topic, clamped_count).await
272 }
273
274 async fn run_iteration(
276 &self,
277 recent_topics: &mut Vec<String>,
278 max_recent: usize,
279 rng: &mut impl rand::Rng,
280 ) -> ThreadResult {
281 match self.storage.last_thread_time().await {
283 Ok(Some(last_time)) => {
284 let elapsed = chrono::Utc::now()
285 .signed_duration_since(last_time)
286 .num_seconds()
287 .max(0) as u64;
288
289 if elapsed < self.thread_interval_secs {
290 return ThreadResult::TooSoon {
291 elapsed_secs: elapsed,
292 interval_secs: self.thread_interval_secs,
293 };
294 }
295 }
296 Ok(None) => {
297 }
299 Err(e) => {
300 tracing::warn!(error = %e, "Failed to query last thread time, proceeding anyway");
301 }
302 }
303
304 if !self.safety.can_post_thread().await {
306 return ThreadResult::RateLimited;
307 }
308
309 let topic = pick_topic(&self.topics, recent_topics, rng);
311
312 let result = self.generate_and_post(&topic, None).await;
313
314 if matches!(result, ThreadResult::Posted { .. }) {
316 if recent_topics.len() >= max_recent {
317 recent_topics.remove(0);
318 }
319 recent_topics.push(topic);
320 }
321
322 result
323 }
324
325 async fn generate_and_post(&self, topic: &str, count: Option<usize>) -> ThreadResult {
327 tracing::info!(topic = %topic, "Generating thread on topic");
328
329 let tweets = match self.generate_with_validation(topic, count).await {
331 Ok(tweets) => tweets,
332 Err(result) => return result,
333 };
334
335 let tweet_count = tweets.len();
336
337 if self.dry_run {
338 tracing::info!(
339 "DRY RUN: Would post thread on topic '{}' ({} tweets):",
340 topic,
341 tweet_count
342 );
343
344 for (i, tweet) in tweets.iter().enumerate() {
345 tracing::info!(
346 " {}/{}: \"{}\" ({} chars)",
347 i + 1,
348 tweet_count,
349 tweet,
350 tweet.len()
351 );
352 }
353
354 let _ = self
355 .storage
356 .log_action(
357 "thread",
358 "dry_run",
359 &format!("Topic '{}': {} tweets", topic, tweet_count),
360 )
361 .await;
362
363 return ThreadResult::Posted {
364 topic: topic.to_string(),
365 tweet_count,
366 thread_id: "dry-run".to_string(),
367 };
368 }
369
370 let thread_id = match self.storage.create_thread(topic, tweet_count).await {
372 Ok(id) => id,
373 Err(e) => {
374 tracing::error!(error = %e, "Failed to create thread record");
375 return ThreadResult::Failed {
376 error: format!("Storage error: {e}"),
377 };
378 }
379 };
380
381 let result = self.post_reply_chain(&thread_id, &tweets, topic).await;
383
384 let (status, message) = match &result {
386 ThreadResult::Posted { tweet_count, .. } => (
387 "success",
388 format!("Topic '{}': {} tweets posted", topic, tweet_count),
389 ),
390 ThreadResult::PartialFailure {
391 tweets_posted,
392 total_tweets,
393 error,
394 ..
395 } => (
396 "partial",
397 format!(
398 "Topic '{}': {}/{} tweets posted, error: {}",
399 topic, tweets_posted, total_tweets, error
400 ),
401 ),
402 _ => ("failure", format!("Topic '{}': unexpected state", topic)),
403 };
404 let _ = self.storage.log_action("thread", status, &message).await;
405
406 result
407 }
408
409 async fn generate_with_validation(
411 &self,
412 topic: &str,
413 count: Option<usize>,
414 ) -> Result<Vec<String>, ThreadResult> {
415 let max_retries = 3;
416
417 for attempt in 0..max_retries {
418 let effective_topic = if attempt == 0 {
419 topic.to_string()
420 } else {
421 format!("{topic} (IMPORTANT: each tweet MUST be under 280 characters)")
422 };
423
424 let tweets = match self
425 .generator
426 .generate_thread(&effective_topic, count)
427 .await
428 {
429 Ok(t) => t,
430 Err(e) => {
431 return Err(ThreadResult::Failed {
432 error: format!("Generation failed: {e}"),
433 });
434 }
435 };
436
437 let all_valid = tweets.iter().all(|t| t.len() <= 280);
439 if all_valid {
440 return Ok(tweets);
441 }
442
443 let over_limit: Vec<usize> = tweets
444 .iter()
445 .enumerate()
446 .filter(|(_, t)| t.len() > 280)
447 .map(|(i, _)| i + 1)
448 .collect();
449
450 tracing::debug!(
451 attempt = attempt + 1,
452 over_limit = ?over_limit,
453 "Thread tweets exceed 280 chars, retrying"
454 );
455 }
456
457 Err(ThreadResult::ValidationFailed {
458 error: format!(
459 "Thread tweets still exceed 280 characters after {max_retries} attempts"
460 ),
461 })
462 }
463
464 async fn post_reply_chain(
467 &self,
468 thread_id: &str,
469 tweets: &[String],
470 topic: &str,
471 ) -> ThreadResult {
472 let total = tweets.len();
473 let mut previous_tweet_id: Option<String> = None;
474 let mut root_tweet_id: Option<String> = None;
475
476 for (i, tweet_content) in tweets.iter().enumerate() {
477 let post_result = if i == 0 {
478 self.poster.post_tweet(tweet_content).await
480 } else {
481 let prev_id = previous_tweet_id
483 .as_ref()
484 .expect("previous_tweet_id set after first tweet");
485 self.poster.reply_to_tweet(prev_id, tweet_content).await
486 };
487
488 match post_result {
489 Ok(new_tweet_id) => {
490 tracing::info!(
491 position = i + 1,
492 total = total,
493 "Posted thread tweet {}/{}",
494 i + 1,
495 total,
496 );
497 if i == 0 {
498 root_tweet_id = Some(new_tweet_id.clone());
499 let _ = self
501 .storage
502 .update_thread_status(thread_id, "posting", i + 1, Some(&new_tweet_id))
503 .await;
504 }
505
506 let _ = self
508 .storage
509 .store_thread_tweet(thread_id, i, &new_tweet_id, tweet_content)
510 .await;
511
512 previous_tweet_id = Some(new_tweet_id);
513
514 if i < total - 1 {
516 let delay = Duration::from_secs(1)
517 + Duration::from_millis(rand::random::<u64>() % 2000);
518 tokio::time::sleep(delay).await;
519 }
520 }
521 Err(e) => {
522 tracing::error!(
523 thread_id = %thread_id,
524 tweet_index = i,
525 error = %e,
526 "Failed to post tweet {}/{} in thread",
527 i + 1,
528 total
529 );
530
531 let _ = self
533 .storage
534 .update_thread_status(thread_id, "partial", i, root_tweet_id.as_deref())
535 .await;
536
537 return ThreadResult::PartialFailure {
538 topic: topic.to_string(),
539 tweets_posted: i,
540 total_tweets: total,
541 error: e.to_string(),
542 };
543 }
544 }
545 }
546
547 let _ = self
549 .storage
550 .update_thread_status(thread_id, "sent", total, root_tweet_id.as_deref())
551 .await;
552
553 ThreadResult::Posted {
554 topic: topic.to_string(),
555 tweet_count: total,
556 thread_id: thread_id.to_string(),
557 }
558 }
559}
560
561fn pick_topic(topics: &[String], recent: &mut Vec<String>, rng: &mut impl rand::Rng) -> String {
563 let available: Vec<&String> = topics.iter().filter(|t| !recent.contains(t)).collect();
564
565 if available.is_empty() {
566 recent.clear();
567 topics.choose(rng).expect("topics is non-empty").clone()
568 } else {
569 available
570 .choose(rng)
571 .expect("available is non-empty")
572 .to_string()
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use std::sync::Mutex;
580
581 struct MockThreadGenerator {
584 tweets: Vec<String>,
585 }
586
587 #[async_trait::async_trait]
588 impl ThreadGenerator for MockThreadGenerator {
589 async fn generate_thread(
590 &self,
591 _topic: &str,
592 _count: Option<usize>,
593 ) -> Result<Vec<String>, ContentLoopError> {
594 Ok(self.tweets.clone())
595 }
596 }
597
598 struct OverlongThreadGenerator;
599
600 #[async_trait::async_trait]
601 impl ThreadGenerator for OverlongThreadGenerator {
602 async fn generate_thread(
603 &self,
604 _topic: &str,
605 _count: Option<usize>,
606 ) -> Result<Vec<String>, ContentLoopError> {
607 Ok(vec!["a".repeat(300), "b".repeat(300)])
609 }
610 }
611
612 struct FailingThreadGenerator;
613
614 #[async_trait::async_trait]
615 impl ThreadGenerator for FailingThreadGenerator {
616 async fn generate_thread(
617 &self,
618 _topic: &str,
619 _count: Option<usize>,
620 ) -> Result<Vec<String>, ContentLoopError> {
621 Err(ContentLoopError::LlmFailure("model error".to_string()))
622 }
623 }
624
625 struct MockSafety {
626 can_tweet: bool,
627 can_thread: bool,
628 }
629
630 #[async_trait::async_trait]
631 impl ContentSafety for MockSafety {
632 async fn can_post_tweet(&self) -> bool {
633 self.can_tweet
634 }
635 async fn can_post_thread(&self) -> bool {
636 self.can_thread
637 }
638 }
639
640 struct MockStorage {
641 last_thread: Mutex<Option<chrono::DateTime<chrono::Utc>>>,
642 threads: Mutex<Vec<(String, usize)>>,
643 thread_statuses: Mutex<Vec<(String, String, usize)>>,
644 thread_tweets: Mutex<Vec<(String, usize, String, String)>>,
645 actions: Mutex<Vec<(String, String, String)>>,
646 }
647
648 impl MockStorage {
649 fn new(last_thread: Option<chrono::DateTime<chrono::Utc>>) -> Self {
650 Self {
651 last_thread: Mutex::new(last_thread),
652 threads: Mutex::new(Vec::new()),
653 thread_statuses: Mutex::new(Vec::new()),
654 thread_tweets: Mutex::new(Vec::new()),
655 actions: Mutex::new(Vec::new()),
656 }
657 }
658
659 fn thread_tweet_count(&self) -> usize {
660 self.thread_tweets.lock().expect("lock").len()
661 }
662
663 fn action_statuses(&self) -> Vec<String> {
664 self.actions
665 .lock()
666 .expect("lock")
667 .iter()
668 .map(|(_, s, _)| s.clone())
669 .collect()
670 }
671 }
672
673 #[async_trait::async_trait]
674 impl ContentStorage for MockStorage {
675 async fn last_tweet_time(
676 &self,
677 ) -> Result<Option<chrono::DateTime<chrono::Utc>>, ContentLoopError> {
678 Ok(None)
679 }
680
681 async fn last_thread_time(
682 &self,
683 ) -> Result<Option<chrono::DateTime<chrono::Utc>>, ContentLoopError> {
684 Ok(*self.last_thread.lock().expect("lock"))
685 }
686
687 async fn todays_tweet_times(
688 &self,
689 ) -> Result<Vec<chrono::DateTime<chrono::Utc>>, ContentLoopError> {
690 Ok(Vec::new())
691 }
692
693 async fn post_tweet(&self, _topic: &str, _content: &str) -> Result<(), ContentLoopError> {
694 Ok(())
695 }
696
697 async fn create_thread(
698 &self,
699 topic: &str,
700 tweet_count: usize,
701 ) -> Result<String, ContentLoopError> {
702 let id = format!("thread-{}", self.threads.lock().expect("lock").len() + 1);
703 self.threads
704 .lock()
705 .expect("lock")
706 .push((topic.to_string(), tweet_count));
707 Ok(id)
708 }
709
710 async fn update_thread_status(
711 &self,
712 thread_id: &str,
713 status: &str,
714 tweet_count: usize,
715 _root_tweet_id: Option<&str>,
716 ) -> Result<(), ContentLoopError> {
717 self.thread_statuses.lock().expect("lock").push((
718 thread_id.to_string(),
719 status.to_string(),
720 tweet_count,
721 ));
722 Ok(())
723 }
724
725 async fn store_thread_tweet(
726 &self,
727 thread_id: &str,
728 position: usize,
729 tweet_id: &str,
730 content: &str,
731 ) -> Result<(), ContentLoopError> {
732 self.thread_tweets.lock().expect("lock").push((
733 thread_id.to_string(),
734 position,
735 tweet_id.to_string(),
736 content.to_string(),
737 ));
738 Ok(())
739 }
740
741 async fn log_action(
742 &self,
743 action_type: &str,
744 status: &str,
745 message: &str,
746 ) -> Result<(), ContentLoopError> {
747 self.actions.lock().expect("lock").push((
748 action_type.to_string(),
749 status.to_string(),
750 message.to_string(),
751 ));
752 Ok(())
753 }
754 }
755
756 struct MockPoster {
757 posted: Mutex<Vec<(Option<String>, String)>>,
758 fail_at_index: Option<usize>,
759 }
760
761 impl MockPoster {
762 fn new() -> Self {
763 Self {
764 posted: Mutex::new(Vec::new()),
765 fail_at_index: None,
766 }
767 }
768
769 fn failing_at(index: usize) -> Self {
770 Self {
771 posted: Mutex::new(Vec::new()),
772 fail_at_index: Some(index),
773 }
774 }
775
776 fn posted_count(&self) -> usize {
777 self.posted.lock().expect("lock").len()
778 }
779 }
780
781 #[async_trait::async_trait]
782 impl ThreadPoster for MockPoster {
783 async fn post_tweet(&self, content: &str) -> Result<String, ContentLoopError> {
784 let mut posted = self.posted.lock().expect("lock");
785 if self.fail_at_index == Some(posted.len()) {
786 return Err(ContentLoopError::PostFailed("API error".to_string()));
787 }
788 let id = format!("tweet-{}", posted.len() + 1);
789 posted.push((None, content.to_string()));
790 Ok(id)
791 }
792
793 async fn reply_to_tweet(
794 &self,
795 in_reply_to: &str,
796 content: &str,
797 ) -> Result<String, ContentLoopError> {
798 let mut posted = self.posted.lock().expect("lock");
799 if self.fail_at_index == Some(posted.len()) {
800 return Err(ContentLoopError::PostFailed("API error".to_string()));
801 }
802 let id = format!("tweet-{}", posted.len() + 1);
803 posted.push((Some(in_reply_to.to_string()), content.to_string()));
804 Ok(id)
805 }
806 }
807
808 fn make_topics() -> Vec<String> {
809 vec![
810 "Rust".to_string(),
811 "CLI tools".to_string(),
812 "Open source".to_string(),
813 ]
814 }
815
816 fn make_thread_tweets() -> Vec<String> {
817 vec![
818 "Thread on Rust: Let me share what I've learned...".to_string(),
819 "First, the ownership model is game-changing.".to_string(),
820 "Second, pattern matching makes error handling elegant.".to_string(),
821 "Third, the compiler is your best friend.".to_string(),
822 "Finally, the community is incredibly welcoming.".to_string(),
823 ]
824 }
825
826 #[tokio::test]
829 async fn run_once_posts_thread() {
830 let storage = Arc::new(MockStorage::new(None));
831 let poster = Arc::new(MockPoster::new());
832
833 let thread_loop = ThreadLoop::new(
834 Arc::new(MockThreadGenerator {
835 tweets: make_thread_tweets(),
836 }),
837 Arc::new(MockSafety {
838 can_tweet: true,
839 can_thread: true,
840 }),
841 storage.clone(),
842 poster.clone(),
843 make_topics(),
844 604800,
845 false,
846 );
847
848 let result = thread_loop.run_once(Some("Rust"), None).await;
849
850 assert!(matches!(
851 result,
852 ThreadResult::Posted { tweet_count: 5, .. }
853 ));
854 assert_eq!(poster.posted_count(), 5);
855 assert_eq!(storage.thread_tweet_count(), 5);
856 }
857
858 #[tokio::test]
859 async fn run_once_dry_run_does_not_post() {
860 let storage = Arc::new(MockStorage::new(None));
861 let poster = Arc::new(MockPoster::new());
862
863 let thread_loop = ThreadLoop::new(
864 Arc::new(MockThreadGenerator {
865 tweets: make_thread_tweets(),
866 }),
867 Arc::new(MockSafety {
868 can_tweet: true,
869 can_thread: true,
870 }),
871 storage.clone(),
872 poster.clone(),
873 make_topics(),
874 604800,
875 true, );
877
878 let result = thread_loop.run_once(Some("Rust"), None).await;
879
880 assert!(matches!(result, ThreadResult::Posted { .. }));
881 assert_eq!(poster.posted_count(), 0); assert_eq!(storage.action_statuses(), vec!["dry_run"]);
883 }
884
885 #[tokio::test]
886 async fn run_once_rate_limited() {
887 let thread_loop = ThreadLoop::new(
888 Arc::new(MockThreadGenerator {
889 tweets: make_thread_tweets(),
890 }),
891 Arc::new(MockSafety {
892 can_tweet: true,
893 can_thread: false,
894 }),
895 Arc::new(MockStorage::new(None)),
896 Arc::new(MockPoster::new()),
897 make_topics(),
898 604800,
899 false,
900 );
901
902 let result = thread_loop.run_once(None, None).await;
903 assert!(matches!(result, ThreadResult::RateLimited));
904 }
905
906 #[tokio::test]
907 async fn run_once_no_topics() {
908 let thread_loop = ThreadLoop::new(
909 Arc::new(MockThreadGenerator {
910 tweets: make_thread_tweets(),
911 }),
912 Arc::new(MockSafety {
913 can_tweet: true,
914 can_thread: true,
915 }),
916 Arc::new(MockStorage::new(None)),
917 Arc::new(MockPoster::new()),
918 Vec::new(), 604800,
920 false,
921 );
922
923 let result = thread_loop.run_once(None, None).await;
924 assert!(matches!(result, ThreadResult::NoTopics));
925 }
926
927 #[tokio::test]
928 async fn run_once_generation_failure() {
929 let thread_loop = ThreadLoop::new(
930 Arc::new(FailingThreadGenerator),
931 Arc::new(MockSafety {
932 can_tweet: true,
933 can_thread: true,
934 }),
935 Arc::new(MockStorage::new(None)),
936 Arc::new(MockPoster::new()),
937 make_topics(),
938 604800,
939 false,
940 );
941
942 let result = thread_loop.run_once(Some("Rust"), None).await;
943 assert!(matches!(result, ThreadResult::Failed { .. }));
944 }
945
946 #[tokio::test]
947 async fn run_once_validation_failure() {
948 let thread_loop = ThreadLoop::new(
949 Arc::new(OverlongThreadGenerator),
950 Arc::new(MockSafety {
951 can_tweet: true,
952 can_thread: true,
953 }),
954 Arc::new(MockStorage::new(None)),
955 Arc::new(MockPoster::new()),
956 make_topics(),
957 604800,
958 false,
959 );
960
961 let result = thread_loop.run_once(Some("Rust"), None).await;
962 assert!(matches!(result, ThreadResult::ValidationFailed { .. }));
963 }
964
965 #[tokio::test]
966 async fn partial_failure_records_correctly() {
967 let storage = Arc::new(MockStorage::new(None));
968 let poster = Arc::new(MockPoster::failing_at(2));
970
971 let thread_loop = ThreadLoop::new(
972 Arc::new(MockThreadGenerator {
973 tweets: make_thread_tweets(),
974 }),
975 Arc::new(MockSafety {
976 can_tweet: true,
977 can_thread: true,
978 }),
979 storage.clone(),
980 poster.clone(),
981 make_topics(),
982 604800,
983 false,
984 );
985
986 let result = thread_loop.run_once(Some("Rust"), None).await;
987
988 match result {
989 ThreadResult::PartialFailure {
990 tweets_posted,
991 total_tweets,
992 ..
993 } => {
994 assert_eq!(tweets_posted, 2); assert_eq!(total_tweets, 5);
996 }
997 other => panic!("Expected PartialFailure, got {other:?}"),
998 }
999
1000 assert_eq!(storage.thread_tweet_count(), 2);
1002 assert_eq!(poster.posted_count(), 2);
1003 }
1004
1005 #[tokio::test]
1006 async fn run_once_clamps_count() {
1007 let poster = Arc::new(MockPoster::new());
1008 let storage = Arc::new(MockStorage::new(None));
1009
1010 let tweets = vec![
1012 "Tweet 1".to_string(),
1013 "Tweet 2".to_string(),
1014 "Tweet 3".to_string(),
1015 ];
1016
1017 let thread_loop = ThreadLoop::new(
1018 Arc::new(MockThreadGenerator { tweets }),
1019 Arc::new(MockSafety {
1020 can_tweet: true,
1021 can_thread: true,
1022 }),
1023 storage,
1024 poster.clone(),
1025 make_topics(),
1026 604800,
1027 false,
1028 );
1029
1030 let result = thread_loop.run_once(Some("Rust"), Some(1)).await;
1032 assert!(matches!(result, ThreadResult::Posted { .. }));
1034 }
1035
1036 #[tokio::test]
1037 async fn run_iteration_skips_when_too_soon() {
1038 let now = chrono::Utc::now();
1039 let last_thread = now - chrono::Duration::days(3);
1040 let storage = Arc::new(MockStorage::new(Some(last_thread)));
1041
1042 let thread_loop = ThreadLoop::new(
1043 Arc::new(MockThreadGenerator {
1044 tweets: make_thread_tweets(),
1045 }),
1046 Arc::new(MockSafety {
1047 can_tweet: true,
1048 can_thread: true,
1049 }),
1050 storage,
1051 Arc::new(MockPoster::new()),
1052 make_topics(),
1053 604800, false,
1055 );
1056
1057 let mut recent = Vec::new();
1058 let mut rng = rand::thread_rng();
1059 let result = thread_loop.run_iteration(&mut recent, 3, &mut rng).await;
1060 assert!(matches!(result, ThreadResult::TooSoon { .. }));
1061 }
1062
1063 #[tokio::test]
1064 async fn run_iteration_posts_when_interval_elapsed() {
1065 let now = chrono::Utc::now();
1066 let last_thread = now - chrono::Duration::days(8);
1067 let storage = Arc::new(MockStorage::new(Some(last_thread)));
1068 let poster = Arc::new(MockPoster::new());
1069
1070 let thread_loop = ThreadLoop::new(
1071 Arc::new(MockThreadGenerator {
1072 tweets: make_thread_tweets(),
1073 }),
1074 Arc::new(MockSafety {
1075 can_tweet: true,
1076 can_thread: true,
1077 }),
1078 storage,
1079 poster.clone(),
1080 make_topics(),
1081 604800, false,
1083 );
1084
1085 let mut recent = Vec::new();
1086 let mut rng = rand::thread_rng();
1087 let result = thread_loop.run_iteration(&mut recent, 3, &mut rng).await;
1088 assert!(matches!(result, ThreadResult::Posted { .. }));
1089 assert_eq!(poster.posted_count(), 5);
1090 assert_eq!(recent.len(), 1); }
1092
1093 #[tokio::test]
1094 async fn reply_chain_structure_correct() {
1095 let poster = Arc::new(MockPoster::new());
1096 let storage = Arc::new(MockStorage::new(None));
1097
1098 let thread_loop = ThreadLoop::new(
1099 Arc::new(MockThreadGenerator {
1100 tweets: vec![
1101 "First".to_string(),
1102 "Second".to_string(),
1103 "Third".to_string(),
1104 ],
1105 }),
1106 Arc::new(MockSafety {
1107 can_tweet: true,
1108 can_thread: true,
1109 }),
1110 storage,
1111 poster.clone(),
1112 make_topics(),
1113 604800,
1114 false,
1115 );
1116
1117 let result = thread_loop.run_once(Some("Rust"), None).await;
1118 assert!(matches!(
1119 result,
1120 ThreadResult::Posted { tweet_count: 3, .. }
1121 ));
1122
1123 let posted = poster.posted.lock().expect("lock");
1124 assert_eq!(posted[0].0, None);
1126 assert_eq!(posted[1].0, Some("tweet-1".to_string()));
1128 assert_eq!(posted[2].0, Some("tweet-2".to_string()));
1130 }
1131}