1use super::loop_helpers::{ContentLoopError, ContentSafety, ContentStorage, ThreadPoster};
8use super::schedule::{apply_slot_jitter, schedule_gate, ActiveSchedule};
9use super::scheduler::LoopScheduler;
10use rand::seq::SliceRandom;
11use rand::SeedableRng;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio_util::sync::CancellationToken;
15
16pub struct ThreadLoop {
18 generator: Arc<dyn ThreadGenerator>,
19 safety: Arc<dyn ContentSafety>,
20 storage: Arc<dyn ContentStorage>,
21 poster: Arc<dyn ThreadPoster>,
22 topics: Vec<String>,
23 thread_interval_secs: u64,
24 dry_run: bool,
25}
26
27#[async_trait::async_trait]
29pub trait ThreadGenerator: Send + Sync {
30 async fn generate_thread(
35 &self,
36 topic: &str,
37 count: Option<usize>,
38 ) -> Result<Vec<String>, ContentLoopError>;
39}
40
41#[derive(Debug)]
43pub enum ThreadResult {
44 Posted {
46 topic: String,
47 tweet_count: usize,
48 thread_id: String,
49 },
50 PartialFailure {
52 topic: String,
53 tweets_posted: usize,
54 total_tweets: usize,
55 error: String,
56 },
57 TooSoon {
59 elapsed_secs: u64,
60 interval_secs: u64,
61 },
62 RateLimited,
64 NoTopics,
66 ValidationFailed { error: String },
68 Failed { error: String },
70}
71
72impl ThreadLoop {
73 #[allow(clippy::too_many_arguments)]
75 pub fn new(
76 generator: Arc<dyn ThreadGenerator>,
77 safety: Arc<dyn ContentSafety>,
78 storage: Arc<dyn ContentStorage>,
79 poster: Arc<dyn ThreadPoster>,
80 topics: Vec<String>,
81 thread_interval_secs: u64,
82 dry_run: bool,
83 ) -> Self {
84 Self {
85 generator,
86 safety,
87 storage,
88 poster,
89 topics,
90 thread_interval_secs,
91 dry_run,
92 }
93 }
94
95 pub async fn run(
97 &self,
98 cancel: CancellationToken,
99 scheduler: LoopScheduler,
100 schedule: Option<Arc<ActiveSchedule>>,
101 ) {
102 let slot_mode = schedule
103 .as_ref()
104 .is_some_and(|s| s.has_thread_preferred_schedule());
105
106 tracing::info!(
107 dry_run = self.dry_run,
108 topics = self.topics.len(),
109 thread_interval_secs = self.thread_interval_secs,
110 slot_mode = slot_mode,
111 "Thread loop started"
112 );
113
114 if self.topics.is_empty() {
115 tracing::warn!("No topics configured, thread loop has nothing to post");
116 cancel.cancelled().await;
117 return;
118 }
119
120 let min_recent = 3usize;
121 let max_recent = (self.topics.len() / 2)
122 .max(min_recent)
123 .min(self.topics.len());
124 let mut recent_topics: Vec<String> = Vec::with_capacity(max_recent);
125 let mut rng = rand::rngs::StdRng::from_entropy();
126
127 loop {
128 if cancel.is_cancelled() {
129 break;
130 }
131
132 if !schedule_gate(&schedule, &cancel).await {
133 break;
134 }
135
136 if slot_mode {
137 let sched = schedule.as_ref().expect("slot_mode requires schedule");
138
139 match sched.next_thread_slot() {
140 Some(wait) => {
141 let jittered_wait = apply_slot_jitter(wait);
142 tracing::info!(
143 wait_secs = jittered_wait.as_secs(),
144 "Thread slot mode: sleeping until preferred thread time"
145 );
146
147 tokio::select! {
148 _ = cancel.cancelled() => break,
149 _ = tokio::time::sleep(jittered_wait) => {},
150 }
151
152 if cancel.is_cancelled() {
153 break;
154 }
155
156 if !self.safety.can_post_thread().await {
158 Self::log_thread_result(&ThreadResult::RateLimited, self.dry_run);
159 continue;
160 }
161
162 let topic = pick_topic(&self.topics, &mut recent_topics, &mut rng);
163 let result = self.generate_and_post(&topic, None).await;
164
165 if matches!(result, ThreadResult::Posted { .. }) {
166 if recent_topics.len() >= max_recent {
167 recent_topics.remove(0);
168 }
169 recent_topics.push(topic);
170 }
171
172 Self::log_thread_result(&result, self.dry_run);
173 }
174 None => {
175 tracing::warn!("Thread slot mode: no next slot found, sleeping 1 hour");
177 tokio::select! {
178 _ = cancel.cancelled() => break,
179 _ = tokio::time::sleep(Duration::from_secs(3600)) => {},
180 }
181 }
182 }
183 } else {
184 let result = self
186 .run_iteration(&mut recent_topics, max_recent, &mut rng)
187 .await;
188 Self::log_thread_result(&result, self.dry_run);
189
190 tokio::select! {
191 _ = cancel.cancelled() => break,
192 _ = scheduler.tick() => {},
193 }
194 }
195 }
196
197 tracing::info!("Thread loop stopped");
198 }
199
200 fn log_thread_result(result: &ThreadResult, dry_run: bool) {
202 match result {
203 ThreadResult::Posted {
204 topic, tweet_count, ..
205 } => {
206 tracing::info!(
207 topic = %topic,
208 tweets = tweet_count,
209 dry_run = dry_run,
210 "Thread iteration: thread posted"
211 );
212 }
213 ThreadResult::PartialFailure {
214 tweets_posted,
215 total_tweets,
216 error,
217 ..
218 } => {
219 tracing::warn!(
220 posted = tweets_posted,
221 total = total_tweets,
222 error = %error,
223 "Thread iteration: partial failure"
224 );
225 }
226 ThreadResult::TooSoon { .. } => {
227 tracing::debug!("Thread iteration: too soon since last thread");
228 }
229 ThreadResult::RateLimited => {
230 tracing::info!("Thread iteration: weekly thread limit reached");
231 }
232 ThreadResult::NoTopics => {
233 tracing::warn!("Thread iteration: no topics available");
234 }
235 ThreadResult::ValidationFailed { error } => {
236 tracing::warn!(error = %error, "Thread iteration: validation failed");
237 }
238 ThreadResult::Failed { error } => {
239 tracing::warn!(error = %error, "Thread iteration: failed");
240 }
241 }
242 }
243
244 pub async fn run_once(&self, topic: Option<&str>, count: Option<usize>) -> ThreadResult {
249 let chosen_topic = match topic {
250 Some(t) => t.to_string(),
251 None => {
252 if self.topics.is_empty() {
253 return ThreadResult::NoTopics;
254 }
255 let mut rng = rand::thread_rng();
256 self.topics
257 .choose(&mut rng)
258 .expect("topics is non-empty")
259 .clone()
260 }
261 };
262
263 let clamped_count = count.map(|c| c.clamp(2, 15));
265
266 if !self.safety.can_post_thread().await {
268 return ThreadResult::RateLimited;
269 }
270
271 self.generate_and_post(&chosen_topic, clamped_count).await
272 }
273
274 async fn run_iteration(
276 &self,
277 recent_topics: &mut Vec<String>,
278 max_recent: usize,
279 rng: &mut impl rand::Rng,
280 ) -> ThreadResult {
281 match self.storage.last_thread_time().await {
283 Ok(Some(last_time)) => {
284 let elapsed = chrono::Utc::now()
285 .signed_duration_since(last_time)
286 .num_seconds()
287 .max(0) as u64;
288
289 if elapsed < self.thread_interval_secs {
290 return ThreadResult::TooSoon {
291 elapsed_secs: elapsed,
292 interval_secs: self.thread_interval_secs,
293 };
294 }
295 }
296 Ok(None) => {
297 }
299 Err(e) => {
300 tracing::warn!(error = %e, "Failed to query last thread time, proceeding anyway");
301 }
302 }
303
304 if !self.safety.can_post_thread().await {
306 return ThreadResult::RateLimited;
307 }
308
309 let topic = pick_topic(&self.topics, recent_topics, rng);
311
312 let result = self.generate_and_post(&topic, None).await;
313
314 if matches!(result, ThreadResult::Posted { .. }) {
316 if recent_topics.len() >= max_recent {
317 recent_topics.remove(0);
318 }
319 recent_topics.push(topic);
320 }
321
322 result
323 }
324
325 async fn generate_and_post(&self, topic: &str, count: Option<usize>) -> ThreadResult {
327 tracing::info!(topic = %topic, "Generating thread on topic");
328
329 let tweets = match self.generate_with_validation(topic, count).await {
331 Ok(tweets) => tweets,
332 Err(result) => return result,
333 };
334
335 let tweet_count = tweets.len();
336
337 if self.dry_run {
338 tracing::info!(
339 "DRY RUN: Would post thread on topic '{}' ({} tweets):",
340 topic,
341 tweet_count
342 );
343
344 for (i, tweet) in tweets.iter().enumerate() {
345 tracing::info!(
346 " {}/{}: \"{}\" ({} chars)",
347 i + 1,
348 tweet_count,
349 tweet,
350 tweet.len()
351 );
352 }
353
354 let _ = self
355 .storage
356 .log_action(
357 "thread",
358 "dry_run",
359 &format!("Topic '{}': {} tweets", topic, tweet_count),
360 )
361 .await;
362
363 return ThreadResult::Posted {
364 topic: topic.to_string(),
365 tweet_count,
366 thread_id: "dry-run".to_string(),
367 };
368 }
369
370 let thread_id = match self.storage.create_thread(topic, tweet_count).await {
372 Ok(id) => id,
373 Err(e) => {
374 tracing::error!(error = %e, "Failed to create thread record");
375 return ThreadResult::Failed {
376 error: format!("Storage error: {e}"),
377 };
378 }
379 };
380
381 let result = self.post_reply_chain(&thread_id, &tweets, topic).await;
383
384 let (status, message) = match &result {
386 ThreadResult::Posted { tweet_count, .. } => (
387 "success",
388 format!("Topic '{}': {} tweets posted", topic, tweet_count),
389 ),
390 ThreadResult::PartialFailure {
391 tweets_posted,
392 total_tweets,
393 error,
394 ..
395 } => (
396 "partial",
397 format!(
398 "Topic '{}': {}/{} tweets posted, error: {}",
399 topic, tweets_posted, total_tweets, error
400 ),
401 ),
402 _ => ("failure", format!("Topic '{}': unexpected state", topic)),
403 };
404 let _ = self.storage.log_action("thread", status, &message).await;
405
406 result
407 }
408
409 async fn generate_with_validation(
411 &self,
412 topic: &str,
413 count: Option<usize>,
414 ) -> Result<Vec<String>, ThreadResult> {
415 let max_retries = 3;
416
417 for attempt in 0..max_retries {
418 let effective_topic = if attempt == 0 {
419 topic.to_string()
420 } else {
421 format!("{topic} (IMPORTANT: each tweet MUST be under 280 characters)")
422 };
423
424 let tweets = match self
425 .generator
426 .generate_thread(&effective_topic, count)
427 .await
428 {
429 Ok(t) => t,
430 Err(e) => {
431 return Err(ThreadResult::Failed {
432 error: format!("Generation failed: {e}"),
433 });
434 }
435 };
436
437 let all_valid = tweets.iter().all(|t| {
439 crate::content::length::tweet_weighted_len(t)
440 <= crate::content::length::MAX_TWEET_CHARS
441 });
442 if all_valid {
443 return Ok(tweets);
444 }
445
446 let over_limit: Vec<usize> = tweets
447 .iter()
448 .enumerate()
449 .filter(|(_, t)| {
450 crate::content::length::tweet_weighted_len(t)
451 > crate::content::length::MAX_TWEET_CHARS
452 })
453 .map(|(i, _)| i + 1)
454 .collect();
455
456 tracing::debug!(
457 attempt = attempt + 1,
458 over_limit = ?over_limit,
459 "Thread tweets exceed 280 chars, retrying"
460 );
461 }
462
463 Err(ThreadResult::ValidationFailed {
464 error: format!(
465 "Thread tweets still exceed 280 characters after {max_retries} attempts"
466 ),
467 })
468 }
469
470 async fn post_reply_chain(
473 &self,
474 thread_id: &str,
475 tweets: &[String],
476 topic: &str,
477 ) -> ThreadResult {
478 let total = tweets.len();
479 let mut previous_tweet_id: Option<String> = None;
480 let mut root_tweet_id: Option<String> = None;
481
482 for (i, tweet_content) in tweets.iter().enumerate() {
483 let post_result = if i == 0 {
484 self.poster.post_tweet(tweet_content).await
486 } else {
487 let prev_id = previous_tweet_id
489 .as_ref()
490 .expect("previous_tweet_id set after first tweet");
491 self.poster.reply_to_tweet(prev_id, tweet_content).await
492 };
493
494 match post_result {
495 Ok(new_tweet_id) => {
496 tracing::info!(
497 position = i + 1,
498 total = total,
499 "Posted thread tweet {}/{}",
500 i + 1,
501 total,
502 );
503 if i == 0 {
504 root_tweet_id = Some(new_tweet_id.clone());
505 let _ = self
507 .storage
508 .update_thread_status(thread_id, "posting", i + 1, Some(&new_tweet_id))
509 .await;
510 }
511
512 let _ = self
514 .storage
515 .store_thread_tweet(thread_id, i, &new_tweet_id, tweet_content)
516 .await;
517
518 previous_tweet_id = Some(new_tweet_id);
519
520 if i < total - 1 {
522 let delay = Duration::from_secs(1)
523 + Duration::from_millis(rand::random::<u64>() % 2000);
524 tokio::time::sleep(delay).await;
525 }
526 }
527 Err(e) => {
528 tracing::error!(
529 thread_id = %thread_id,
530 tweet_index = i,
531 error = %e,
532 "Failed to post tweet {}/{} in thread",
533 i + 1,
534 total
535 );
536
537 let _ = self
539 .storage
540 .update_thread_status(thread_id, "partial", i, root_tweet_id.as_deref())
541 .await;
542
543 return ThreadResult::PartialFailure {
544 topic: topic.to_string(),
545 tweets_posted: i,
546 total_tweets: total,
547 error: e.to_string(),
548 };
549 }
550 }
551 }
552
553 let _ = self
555 .storage
556 .update_thread_status(thread_id, "sent", total, root_tweet_id.as_deref())
557 .await;
558
559 ThreadResult::Posted {
560 topic: topic.to_string(),
561 tweet_count: total,
562 thread_id: thread_id.to_string(),
563 }
564 }
565}
566
567fn pick_topic(topics: &[String], recent: &mut Vec<String>, rng: &mut impl rand::Rng) -> String {
569 let available: Vec<&String> = topics.iter().filter(|t| !recent.contains(t)).collect();
570
571 if available.is_empty() {
572 recent.clear();
573 topics.choose(rng).expect("topics is non-empty").clone()
574 } else {
575 available
576 .choose(rng)
577 .expect("available is non-empty")
578 .to_string()
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use std::sync::Mutex;
586
587 struct MockThreadGenerator {
590 tweets: Vec<String>,
591 }
592
593 #[async_trait::async_trait]
594 impl ThreadGenerator for MockThreadGenerator {
595 async fn generate_thread(
596 &self,
597 _topic: &str,
598 _count: Option<usize>,
599 ) -> Result<Vec<String>, ContentLoopError> {
600 Ok(self.tweets.clone())
601 }
602 }
603
604 struct OverlongThreadGenerator;
605
606 #[async_trait::async_trait]
607 impl ThreadGenerator for OverlongThreadGenerator {
608 async fn generate_thread(
609 &self,
610 _topic: &str,
611 _count: Option<usize>,
612 ) -> Result<Vec<String>, ContentLoopError> {
613 Ok(vec!["a".repeat(300), "b".repeat(300)])
615 }
616 }
617
618 struct FailingThreadGenerator;
619
620 #[async_trait::async_trait]
621 impl ThreadGenerator for FailingThreadGenerator {
622 async fn generate_thread(
623 &self,
624 _topic: &str,
625 _count: Option<usize>,
626 ) -> Result<Vec<String>, ContentLoopError> {
627 Err(ContentLoopError::LlmFailure("model error".to_string()))
628 }
629 }
630
631 struct MockSafety {
632 can_tweet: bool,
633 can_thread: bool,
634 }
635
636 #[async_trait::async_trait]
637 impl ContentSafety for MockSafety {
638 async fn can_post_tweet(&self) -> bool {
639 self.can_tweet
640 }
641 async fn can_post_thread(&self) -> bool {
642 self.can_thread
643 }
644 }
645
646 struct MockStorage {
647 last_thread: Mutex<Option<chrono::DateTime<chrono::Utc>>>,
648 threads: Mutex<Vec<(String, usize)>>,
649 thread_statuses: Mutex<Vec<(String, String, usize)>>,
650 thread_tweets: Mutex<Vec<(String, usize, String, String)>>,
651 actions: Mutex<Vec<(String, String, String)>>,
652 }
653
654 impl MockStorage {
655 fn new(last_thread: Option<chrono::DateTime<chrono::Utc>>) -> Self {
656 Self {
657 last_thread: Mutex::new(last_thread),
658 threads: Mutex::new(Vec::new()),
659 thread_statuses: Mutex::new(Vec::new()),
660 thread_tweets: Mutex::new(Vec::new()),
661 actions: Mutex::new(Vec::new()),
662 }
663 }
664
665 fn thread_tweet_count(&self) -> usize {
666 self.thread_tweets.lock().expect("lock").len()
667 }
668
669 fn action_statuses(&self) -> Vec<String> {
670 self.actions
671 .lock()
672 .expect("lock")
673 .iter()
674 .map(|(_, s, _)| s.clone())
675 .collect()
676 }
677 }
678
679 #[async_trait::async_trait]
680 impl ContentStorage for MockStorage {
681 async fn last_tweet_time(
682 &self,
683 ) -> Result<Option<chrono::DateTime<chrono::Utc>>, ContentLoopError> {
684 Ok(None)
685 }
686
687 async fn last_thread_time(
688 &self,
689 ) -> Result<Option<chrono::DateTime<chrono::Utc>>, ContentLoopError> {
690 Ok(*self.last_thread.lock().expect("lock"))
691 }
692
693 async fn todays_tweet_times(
694 &self,
695 ) -> Result<Vec<chrono::DateTime<chrono::Utc>>, ContentLoopError> {
696 Ok(Vec::new())
697 }
698
699 async fn post_tweet(&self, _topic: &str, _content: &str) -> Result<(), ContentLoopError> {
700 Ok(())
701 }
702
703 async fn create_thread(
704 &self,
705 topic: &str,
706 tweet_count: usize,
707 ) -> Result<String, ContentLoopError> {
708 let id = format!("thread-{}", self.threads.lock().expect("lock").len() + 1);
709 self.threads
710 .lock()
711 .expect("lock")
712 .push((topic.to_string(), tweet_count));
713 Ok(id)
714 }
715
716 async fn update_thread_status(
717 &self,
718 thread_id: &str,
719 status: &str,
720 tweet_count: usize,
721 _root_tweet_id: Option<&str>,
722 ) -> Result<(), ContentLoopError> {
723 self.thread_statuses.lock().expect("lock").push((
724 thread_id.to_string(),
725 status.to_string(),
726 tweet_count,
727 ));
728 Ok(())
729 }
730
731 async fn store_thread_tweet(
732 &self,
733 thread_id: &str,
734 position: usize,
735 tweet_id: &str,
736 content: &str,
737 ) -> Result<(), ContentLoopError> {
738 self.thread_tweets.lock().expect("lock").push((
739 thread_id.to_string(),
740 position,
741 tweet_id.to_string(),
742 content.to_string(),
743 ));
744 Ok(())
745 }
746
747 async fn log_action(
748 &self,
749 action_type: &str,
750 status: &str,
751 message: &str,
752 ) -> Result<(), ContentLoopError> {
753 self.actions.lock().expect("lock").push((
754 action_type.to_string(),
755 status.to_string(),
756 message.to_string(),
757 ));
758 Ok(())
759 }
760 }
761
762 struct MockPoster {
763 posted: Mutex<Vec<(Option<String>, String)>>,
764 fail_at_index: Option<usize>,
765 }
766
767 impl MockPoster {
768 fn new() -> Self {
769 Self {
770 posted: Mutex::new(Vec::new()),
771 fail_at_index: None,
772 }
773 }
774
775 fn failing_at(index: usize) -> Self {
776 Self {
777 posted: Mutex::new(Vec::new()),
778 fail_at_index: Some(index),
779 }
780 }
781
782 fn posted_count(&self) -> usize {
783 self.posted.lock().expect("lock").len()
784 }
785 }
786
787 #[async_trait::async_trait]
788 impl ThreadPoster for MockPoster {
789 async fn post_tweet(&self, content: &str) -> Result<String, ContentLoopError> {
790 let mut posted = self.posted.lock().expect("lock");
791 if self.fail_at_index == Some(posted.len()) {
792 return Err(ContentLoopError::PostFailed("API error".to_string()));
793 }
794 let id = format!("tweet-{}", posted.len() + 1);
795 posted.push((None, content.to_string()));
796 Ok(id)
797 }
798
799 async fn reply_to_tweet(
800 &self,
801 in_reply_to: &str,
802 content: &str,
803 ) -> Result<String, ContentLoopError> {
804 let mut posted = self.posted.lock().expect("lock");
805 if self.fail_at_index == Some(posted.len()) {
806 return Err(ContentLoopError::PostFailed("API error".to_string()));
807 }
808 let id = format!("tweet-{}", posted.len() + 1);
809 posted.push((Some(in_reply_to.to_string()), content.to_string()));
810 Ok(id)
811 }
812 }
813
814 fn make_topics() -> Vec<String> {
815 vec![
816 "Rust".to_string(),
817 "CLI tools".to_string(),
818 "Open source".to_string(),
819 ]
820 }
821
822 fn make_thread_tweets() -> Vec<String> {
823 vec![
824 "Thread on Rust: Let me share what I've learned...".to_string(),
825 "First, the ownership model is game-changing.".to_string(),
826 "Second, pattern matching makes error handling elegant.".to_string(),
827 "Third, the compiler is your best friend.".to_string(),
828 "Finally, the community is incredibly welcoming.".to_string(),
829 ]
830 }
831
832 #[tokio::test]
835 async fn run_once_posts_thread() {
836 let storage = Arc::new(MockStorage::new(None));
837 let poster = Arc::new(MockPoster::new());
838
839 let thread_loop = ThreadLoop::new(
840 Arc::new(MockThreadGenerator {
841 tweets: make_thread_tweets(),
842 }),
843 Arc::new(MockSafety {
844 can_tweet: true,
845 can_thread: true,
846 }),
847 storage.clone(),
848 poster.clone(),
849 make_topics(),
850 604800,
851 false,
852 );
853
854 let result = thread_loop.run_once(Some("Rust"), None).await;
855
856 assert!(matches!(
857 result,
858 ThreadResult::Posted { tweet_count: 5, .. }
859 ));
860 assert_eq!(poster.posted_count(), 5);
861 assert_eq!(storage.thread_tweet_count(), 5);
862 }
863
864 #[tokio::test]
865 async fn run_once_dry_run_does_not_post() {
866 let storage = Arc::new(MockStorage::new(None));
867 let poster = Arc::new(MockPoster::new());
868
869 let thread_loop = ThreadLoop::new(
870 Arc::new(MockThreadGenerator {
871 tweets: make_thread_tweets(),
872 }),
873 Arc::new(MockSafety {
874 can_tweet: true,
875 can_thread: true,
876 }),
877 storage.clone(),
878 poster.clone(),
879 make_topics(),
880 604800,
881 true, );
883
884 let result = thread_loop.run_once(Some("Rust"), None).await;
885
886 assert!(matches!(result, ThreadResult::Posted { .. }));
887 assert_eq!(poster.posted_count(), 0); assert_eq!(storage.action_statuses(), vec!["dry_run"]);
889 }
890
891 #[tokio::test]
892 async fn run_once_rate_limited() {
893 let thread_loop = ThreadLoop::new(
894 Arc::new(MockThreadGenerator {
895 tweets: make_thread_tweets(),
896 }),
897 Arc::new(MockSafety {
898 can_tweet: true,
899 can_thread: false,
900 }),
901 Arc::new(MockStorage::new(None)),
902 Arc::new(MockPoster::new()),
903 make_topics(),
904 604800,
905 false,
906 );
907
908 let result = thread_loop.run_once(None, None).await;
909 assert!(matches!(result, ThreadResult::RateLimited));
910 }
911
912 #[tokio::test]
913 async fn run_once_no_topics() {
914 let thread_loop = ThreadLoop::new(
915 Arc::new(MockThreadGenerator {
916 tweets: make_thread_tweets(),
917 }),
918 Arc::new(MockSafety {
919 can_tweet: true,
920 can_thread: true,
921 }),
922 Arc::new(MockStorage::new(None)),
923 Arc::new(MockPoster::new()),
924 Vec::new(), 604800,
926 false,
927 );
928
929 let result = thread_loop.run_once(None, None).await;
930 assert!(matches!(result, ThreadResult::NoTopics));
931 }
932
933 #[tokio::test]
934 async fn run_once_generation_failure() {
935 let thread_loop = ThreadLoop::new(
936 Arc::new(FailingThreadGenerator),
937 Arc::new(MockSafety {
938 can_tweet: true,
939 can_thread: true,
940 }),
941 Arc::new(MockStorage::new(None)),
942 Arc::new(MockPoster::new()),
943 make_topics(),
944 604800,
945 false,
946 );
947
948 let result = thread_loop.run_once(Some("Rust"), None).await;
949 assert!(matches!(result, ThreadResult::Failed { .. }));
950 }
951
952 #[tokio::test]
953 async fn run_once_validation_failure() {
954 let thread_loop = ThreadLoop::new(
955 Arc::new(OverlongThreadGenerator),
956 Arc::new(MockSafety {
957 can_tweet: true,
958 can_thread: true,
959 }),
960 Arc::new(MockStorage::new(None)),
961 Arc::new(MockPoster::new()),
962 make_topics(),
963 604800,
964 false,
965 );
966
967 let result = thread_loop.run_once(Some("Rust"), None).await;
968 assert!(matches!(result, ThreadResult::ValidationFailed { .. }));
969 }
970
971 #[tokio::test]
972 async fn partial_failure_records_correctly() {
973 let storage = Arc::new(MockStorage::new(None));
974 let poster = Arc::new(MockPoster::failing_at(2));
976
977 let thread_loop = ThreadLoop::new(
978 Arc::new(MockThreadGenerator {
979 tweets: make_thread_tweets(),
980 }),
981 Arc::new(MockSafety {
982 can_tweet: true,
983 can_thread: true,
984 }),
985 storage.clone(),
986 poster.clone(),
987 make_topics(),
988 604800,
989 false,
990 );
991
992 let result = thread_loop.run_once(Some("Rust"), None).await;
993
994 match result {
995 ThreadResult::PartialFailure {
996 tweets_posted,
997 total_tweets,
998 ..
999 } => {
1000 assert_eq!(tweets_posted, 2); assert_eq!(total_tweets, 5);
1002 }
1003 other => panic!("Expected PartialFailure, got {other:?}"),
1004 }
1005
1006 assert_eq!(storage.thread_tweet_count(), 2);
1008 assert_eq!(poster.posted_count(), 2);
1009 }
1010
1011 #[tokio::test]
1012 async fn run_once_clamps_count() {
1013 let poster = Arc::new(MockPoster::new());
1014 let storage = Arc::new(MockStorage::new(None));
1015
1016 let tweets = vec![
1018 "Tweet 1".to_string(),
1019 "Tweet 2".to_string(),
1020 "Tweet 3".to_string(),
1021 ];
1022
1023 let thread_loop = ThreadLoop::new(
1024 Arc::new(MockThreadGenerator { tweets }),
1025 Arc::new(MockSafety {
1026 can_tweet: true,
1027 can_thread: true,
1028 }),
1029 storage,
1030 poster.clone(),
1031 make_topics(),
1032 604800,
1033 false,
1034 );
1035
1036 let result = thread_loop.run_once(Some("Rust"), Some(1)).await;
1038 assert!(matches!(result, ThreadResult::Posted { .. }));
1040 }
1041
1042 #[tokio::test]
1043 async fn run_iteration_skips_when_too_soon() {
1044 let now = chrono::Utc::now();
1045 let last_thread = now - chrono::Duration::days(3);
1046 let storage = Arc::new(MockStorage::new(Some(last_thread)));
1047
1048 let thread_loop = ThreadLoop::new(
1049 Arc::new(MockThreadGenerator {
1050 tweets: make_thread_tweets(),
1051 }),
1052 Arc::new(MockSafety {
1053 can_tweet: true,
1054 can_thread: true,
1055 }),
1056 storage,
1057 Arc::new(MockPoster::new()),
1058 make_topics(),
1059 604800, false,
1061 );
1062
1063 let mut recent = Vec::new();
1064 let mut rng = rand::thread_rng();
1065 let result = thread_loop.run_iteration(&mut recent, 3, &mut rng).await;
1066 assert!(matches!(result, ThreadResult::TooSoon { .. }));
1067 }
1068
1069 #[tokio::test]
1070 async fn run_iteration_posts_when_interval_elapsed() {
1071 let now = chrono::Utc::now();
1072 let last_thread = now - chrono::Duration::days(8);
1073 let storage = Arc::new(MockStorage::new(Some(last_thread)));
1074 let poster = Arc::new(MockPoster::new());
1075
1076 let thread_loop = ThreadLoop::new(
1077 Arc::new(MockThreadGenerator {
1078 tweets: make_thread_tweets(),
1079 }),
1080 Arc::new(MockSafety {
1081 can_tweet: true,
1082 can_thread: true,
1083 }),
1084 storage,
1085 poster.clone(),
1086 make_topics(),
1087 604800, false,
1089 );
1090
1091 let mut recent = Vec::new();
1092 let mut rng = rand::thread_rng();
1093 let result = thread_loop.run_iteration(&mut recent, 3, &mut rng).await;
1094 assert!(matches!(result, ThreadResult::Posted { .. }));
1095 assert_eq!(poster.posted_count(), 5);
1096 assert_eq!(recent.len(), 1); }
1098
1099 #[tokio::test]
1100 async fn reply_chain_structure_correct() {
1101 let poster = Arc::new(MockPoster::new());
1102 let storage = Arc::new(MockStorage::new(None));
1103
1104 let thread_loop = ThreadLoop::new(
1105 Arc::new(MockThreadGenerator {
1106 tweets: vec![
1107 "First".to_string(),
1108 "Second".to_string(),
1109 "Third".to_string(),
1110 ],
1111 }),
1112 Arc::new(MockSafety {
1113 can_tweet: true,
1114 can_thread: true,
1115 }),
1116 storage,
1117 poster.clone(),
1118 make_topics(),
1119 604800,
1120 false,
1121 );
1122
1123 let result = thread_loop.run_once(Some("Rust"), None).await;
1124 assert!(matches!(
1125 result,
1126 ThreadResult::Posted { tweet_count: 3, .. }
1127 ));
1128
1129 let posted = poster.posted.lock().expect("lock");
1130 assert_eq!(posted[0].0, None);
1132 assert_eq!(posted[1].0, Some("tweet-1".to_string()));
1134 assert_eq!(posted[2].0, Some("tweet-2".to_string()));
1136 }
1137}