1use super::super::schedule::{apply_slot_jitter, schedule_gate, ActiveSchedule};
7use super::super::scheduler::LoopScheduler;
8use super::{ThreadLoop, ThreadResult};
9use rand::seq::IndexedRandom;
10use rand::SeedableRng;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio_util::sync::CancellationToken;
14
15impl ThreadLoop {
16 pub async fn run(
18 &self,
19 cancel: CancellationToken,
20 scheduler: LoopScheduler,
21 schedule: Option<Arc<ActiveSchedule>>,
22 ) {
23 let slot_mode = schedule
24 .as_ref()
25 .is_some_and(|s| s.has_thread_preferred_schedule());
26
27 tracing::info!(
28 dry_run = self.dry_run,
29 topics = self.topics.len(),
30 thread_interval_secs = self.thread_interval_secs,
31 slot_mode = slot_mode,
32 "Thread loop started"
33 );
34
35 if self.topics.is_empty() {
36 tracing::warn!("No topics configured, thread loop has nothing to post");
37 cancel.cancelled().await;
38 return;
39 }
40
41 let min_recent = 3usize;
42 let max_recent = (self.topics.len() / 2)
43 .max(min_recent)
44 .min(self.topics.len());
45 let mut recent_topics: Vec<String> = Vec::with_capacity(max_recent);
46 let mut rng = rand::rngs::StdRng::from_rng(&mut rand::rng());
47
48 loop {
49 if cancel.is_cancelled() {
50 break;
51 }
52
53 if !schedule_gate(&schedule, &cancel).await {
54 break;
55 }
56
57 if slot_mode {
58 let sched = schedule.as_ref().expect("slot_mode requires schedule");
59
60 match sched.next_thread_slot() {
61 Some(wait) => {
62 let jittered_wait = apply_slot_jitter(wait);
63 tracing::info!(
64 wait_secs = jittered_wait.as_secs(),
65 "Thread slot mode: sleeping until preferred thread time"
66 );
67
68 tokio::select! {
69 _ = cancel.cancelled() => break,
70 _ = tokio::time::sleep(jittered_wait) => {},
71 }
72
73 if cancel.is_cancelled() {
74 break;
75 }
76
77 if !self.safety.can_post_thread().await {
78 Self::log_thread_result(&ThreadResult::RateLimited, self.dry_run);
79 continue;
80 }
81
82 let topic = super::pick_topic(&self.topics, &mut recent_topics, &mut rng);
83 let result = self.generate_and_post(&topic, None).await;
84
85 if matches!(result, ThreadResult::Posted { .. }) {
86 if recent_topics.len() >= max_recent {
87 recent_topics.remove(0);
88 }
89 recent_topics.push(topic);
90 }
91
92 Self::log_thread_result(&result, self.dry_run);
93 }
94 None => {
95 tracing::warn!("Thread slot mode: no next slot found, sleeping 1 hour");
96 tokio::select! {
97 _ = cancel.cancelled() => break,
98 _ = tokio::time::sleep(Duration::from_secs(3600)) => {},
99 }
100 }
101 }
102 } else {
103 let result = self
104 .run_iteration(&mut recent_topics, max_recent, &mut rng)
105 .await;
106 Self::log_thread_result(&result, self.dry_run);
107
108 tokio::select! {
109 _ = cancel.cancelled() => break,
110 _ = scheduler.tick() => {},
111 }
112 }
113 }
114
115 tracing::info!("Thread loop stopped");
116 }
117
118 pub(super) fn log_thread_result(result: &ThreadResult, dry_run: bool) {
120 match result {
121 ThreadResult::Posted {
122 topic, tweet_count, ..
123 } => {
124 tracing::info!(
125 topic = %topic,
126 tweets = tweet_count,
127 dry_run = dry_run,
128 "Thread iteration: thread posted"
129 );
130 }
131 ThreadResult::PartialFailure {
132 tweets_posted,
133 total_tweets,
134 error,
135 ..
136 } => {
137 tracing::warn!(
138 posted = tweets_posted,
139 total = total_tweets,
140 error = %error,
141 "Thread iteration: partial failure"
142 );
143 }
144 ThreadResult::TooSoon { .. } => {
145 tracing::debug!("Thread iteration: too soon since last thread");
146 }
147 ThreadResult::RateLimited => {
148 tracing::info!("Thread iteration: weekly thread limit reached");
149 }
150 ThreadResult::NoTopics => {
151 tracing::warn!("Thread iteration: no topics available");
152 }
153 ThreadResult::ValidationFailed { error } => {
154 tracing::warn!(error = %error, "Thread iteration: validation failed");
155 }
156 ThreadResult::Failed { error } => {
157 tracing::warn!(error = %error, "Thread iteration: failed");
158 }
159 }
160 }
161
162 pub async fn run_once(&self, topic: Option<&str>, count: Option<usize>) -> ThreadResult {
167 let chosen_topic = match topic {
168 Some(t) => t.to_string(),
169 None => {
170 if self.topics.is_empty() {
171 return ThreadResult::NoTopics;
172 }
173 let mut rng = rand::rng();
174 self.topics
175 .choose(&mut rng)
176 .expect("topics is non-empty")
177 .clone()
178 }
179 };
180
181 let clamped_count = count.map(|c| c.clamp(2, 15));
182
183 if !self.safety.can_post_thread().await {
184 return ThreadResult::RateLimited;
185 }
186
187 self.generate_and_post(&chosen_topic, clamped_count).await
188 }
189
190 pub(super) async fn run_iteration(
192 &self,
193 recent_topics: &mut Vec<String>,
194 max_recent: usize,
195 rng: &mut impl rand::Rng,
196 ) -> ThreadResult {
197 match self.storage.last_thread_time().await {
198 Ok(Some(last_time)) => {
199 let elapsed = chrono::Utc::now()
200 .signed_duration_since(last_time)
201 .num_seconds()
202 .max(0) as u64;
203
204 if elapsed < self.thread_interval_secs {
205 return ThreadResult::TooSoon {
206 elapsed_secs: elapsed,
207 interval_secs: self.thread_interval_secs,
208 };
209 }
210 }
211 Ok(None) => {}
212 Err(e) => {
213 tracing::warn!(error = %e, "Failed to query last thread time, proceeding anyway");
214 }
215 }
216
217 if !self.safety.can_post_thread().await {
218 return ThreadResult::RateLimited;
219 }
220
221 let topic = super::pick_topic(&self.topics, recent_topics, rng);
222
223 let result = self.generate_and_post(&topic, None).await;
224
225 if matches!(result, ThreadResult::Posted { .. }) {
226 if recent_topics.len() >= max_recent {
227 recent_topics.remove(0);
228 }
229 recent_topics.push(topic);
230 }
231
232 result
233 }
234}
235
236#[cfg(test)]
241mod tests {
242 use super::super::test_mocks::{
243 make_thread_tweets, make_topics, MockPoster, MockSafety, MockStorage, MockThreadGenerator,
244 };
245 use super::super::{ThreadLoop, ThreadResult};
246 use std::sync::Arc;
247
248 #[tokio::test]
249 async fn run_once_rate_limited() {
250 let loop_ = ThreadLoop::new(
251 Arc::new(MockThreadGenerator {
252 tweets: make_thread_tweets(),
253 }),
254 Arc::new(MockSafety {
255 can_tweet: true,
256 can_thread: false,
257 }),
258 Arc::new(MockStorage::new(None)),
259 Arc::new(MockPoster::new()),
260 make_topics(),
261 604800,
262 false,
263 );
264
265 let result = loop_.run_once(None, None).await;
266 assert!(matches!(result, ThreadResult::RateLimited));
267 }
268
269 #[tokio::test]
270 async fn run_once_no_topics() {
271 let loop_ = ThreadLoop::new(
272 Arc::new(MockThreadGenerator {
273 tweets: make_thread_tweets(),
274 }),
275 Arc::new(MockSafety {
276 can_tweet: true,
277 can_thread: true,
278 }),
279 Arc::new(MockStorage::new(None)),
280 Arc::new(MockPoster::new()),
281 Vec::new(),
282 604800,
283 false,
284 );
285
286 let result = loop_.run_once(None, None).await;
287 assert!(matches!(result, ThreadResult::NoTopics));
288 }
289
290 #[tokio::test]
291 async fn run_once_clamps_count() {
292 let poster = Arc::new(MockPoster::new());
293 let tweets = vec![
294 "Tweet 1".to_string(),
295 "Tweet 2".to_string(),
296 "Tweet 3".to_string(),
297 ];
298
299 let loop_ = ThreadLoop::new(
300 Arc::new(MockThreadGenerator { tweets }),
301 Arc::new(MockSafety {
302 can_tweet: true,
303 can_thread: true,
304 }),
305 Arc::new(MockStorage::new(None)),
306 poster.clone(),
307 make_topics(),
308 604800,
309 false,
310 );
311
312 let result = loop_.run_once(Some("Rust"), Some(1)).await;
314 assert!(matches!(result, ThreadResult::Posted { .. }));
315 }
316
317 #[tokio::test]
318 async fn run_iteration_skips_when_too_soon() {
319 let now = chrono::Utc::now();
320 let last_thread = now - chrono::Duration::days(3);
321 let storage = Arc::new(MockStorage::new(Some(last_thread)));
322
323 let loop_ = ThreadLoop::new(
324 Arc::new(MockThreadGenerator {
325 tweets: make_thread_tweets(),
326 }),
327 Arc::new(MockSafety {
328 can_tweet: true,
329 can_thread: true,
330 }),
331 storage,
332 Arc::new(MockPoster::new()),
333 make_topics(),
334 604800, false,
336 );
337
338 let mut recent = Vec::new();
339 let mut rng = rand::rng();
340 let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
341 assert!(matches!(result, ThreadResult::TooSoon { .. }));
342 }
343
344 #[tokio::test]
345 async fn run_iteration_posts_when_interval_elapsed() {
346 let now = chrono::Utc::now();
347 let last_thread = now - chrono::Duration::days(8);
348 let storage = Arc::new(MockStorage::new(Some(last_thread)));
349 let poster = Arc::new(MockPoster::new());
350
351 let loop_ = ThreadLoop::new(
352 Arc::new(MockThreadGenerator {
353 tweets: make_thread_tweets(),
354 }),
355 Arc::new(MockSafety {
356 can_tweet: true,
357 can_thread: true,
358 }),
359 storage,
360 poster.clone(),
361 make_topics(),
362 604800, false,
364 );
365
366 let mut recent = Vec::new();
367 let mut rng = rand::rng();
368 let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
369 assert!(matches!(result, ThreadResult::Posted { .. }));
370 assert_eq!(poster.posted_count(), 5);
371 assert_eq!(recent.len(), 1);
372 }
373
374 #[tokio::test]
379 async fn run_cancels_immediately_with_topics() {
380 use crate::automation::scheduler::LoopScheduler;
382 use std::time::Duration;
383 use tokio_util::sync::CancellationToken;
384
385 let cancel = CancellationToken::new();
386 cancel.cancel();
387
388 let loop_ = ThreadLoop::new(
389 Arc::new(MockThreadGenerator {
390 tweets: make_thread_tweets(),
391 }),
392 Arc::new(MockSafety {
393 can_tweet: true,
394 can_thread: true,
395 }),
396 Arc::new(MockStorage::new(None)),
397 Arc::new(MockPoster::new()),
398 make_topics(),
399 604800,
400 false,
401 );
402
403 let scheduler =
404 LoopScheduler::new(Duration::from_secs(3600), Duration::ZERO, Duration::ZERO);
405 loop_.run(cancel, scheduler, None).await;
406 }
407
408 #[tokio::test]
409 async fn run_no_topics_exits_on_cancel() {
410 use crate::automation::scheduler::LoopScheduler;
412 use std::time::Duration;
413 use tokio_util::sync::CancellationToken;
414
415 let cancel = CancellationToken::new();
416 cancel.cancel();
417
418 let loop_ = ThreadLoop::new(
419 Arc::new(MockThreadGenerator {
420 tweets: make_thread_tweets(),
421 }),
422 Arc::new(MockSafety {
423 can_tweet: true,
424 can_thread: true,
425 }),
426 Arc::new(MockStorage::new(None)),
427 Arc::new(MockPoster::new()),
428 vec![], 604800,
430 false,
431 );
432
433 let scheduler = LoopScheduler::new(Duration::from_secs(1), Duration::ZERO, Duration::ZERO);
434 loop_.run(cancel, scheduler, None).await;
435 }
436
437 #[tokio::test]
438 async fn run_interval_mode_one_iteration_then_cancel() {
439 use crate::automation::scheduler::LoopScheduler;
442 use std::time::Duration;
443 use tokio_util::sync::CancellationToken;
444
445 let cancel = CancellationToken::new();
446 let cancel_clone = cancel.clone();
447
448 let storage = Arc::new(MockStorage::new(Some(chrono::Utc::now())));
450
451 let loop_ = ThreadLoop::new(
452 Arc::new(MockThreadGenerator {
453 tweets: make_thread_tweets(),
454 }),
455 Arc::new(MockSafety {
456 can_tweet: true,
457 can_thread: true,
458 }),
459 storage,
460 Arc::new(MockPoster::new()),
461 make_topics(),
462 999999, false,
464 );
465
466 let scheduler =
467 LoopScheduler::new(Duration::from_millis(1), Duration::ZERO, Duration::ZERO);
468
469 tokio::spawn(async move {
470 tokio::time::sleep(Duration::from_millis(50)).await;
471 cancel_clone.cancel();
472 });
473
474 tokio::time::timeout(Duration::from_secs(5), loop_.run(cancel, scheduler, None))
475 .await
476 .expect("run() should complete within timeout");
477 }
478
479 #[test]
484 fn log_thread_result_all_variants() {
485 ThreadLoop::log_thread_result(
487 &ThreadResult::Posted {
488 topic: "Rust".to_string(),
489 tweet_count: 3,
490 thread_id: "t1".to_string(),
491 },
492 false,
493 );
494 ThreadLoop::log_thread_result(
495 &ThreadResult::PartialFailure {
496 topic: "Rust".to_string(),
497 tweets_posted: 2,
498 total_tweets: 5,
499 error: "network error".to_string(),
500 },
501 false,
502 );
503 ThreadLoop::log_thread_result(
504 &ThreadResult::TooSoon {
505 elapsed_secs: 10,
506 interval_secs: 604800,
507 },
508 false,
509 );
510 ThreadLoop::log_thread_result(&ThreadResult::RateLimited, false);
511 ThreadLoop::log_thread_result(&ThreadResult::NoTopics, false);
512 ThreadLoop::log_thread_result(
513 &ThreadResult::ValidationFailed {
514 error: "too long".to_string(),
515 },
516 false,
517 );
518 ThreadLoop::log_thread_result(
519 &ThreadResult::Failed {
520 error: "llm failed".to_string(),
521 },
522 false,
523 );
524 }
525
526 #[tokio::test]
531 async fn run_once_with_specific_topic() {
532 let poster = Arc::new(MockPoster::new());
533 let loop_ = ThreadLoop::new(
534 Arc::new(MockThreadGenerator {
535 tweets: make_thread_tweets(),
536 }),
537 Arc::new(MockSafety {
538 can_tweet: true,
539 can_thread: true,
540 }),
541 Arc::new(MockStorage::new(None)),
542 poster.clone(),
543 make_topics(),
544 604800,
545 false,
546 );
547
548 let result = loop_.run_once(Some("CLI tools"), None).await;
549 assert!(matches!(result, ThreadResult::Posted { .. }));
550 if let ThreadResult::Posted { topic, .. } = result {
551 assert_eq!(topic, "CLI tools");
552 }
553 }
554
555 #[tokio::test]
556 async fn run_once_with_custom_count() {
557 let poster = Arc::new(MockPoster::new());
558 let loop_ = ThreadLoop::new(
559 Arc::new(MockThreadGenerator {
560 tweets: make_thread_tweets(),
561 }),
562 Arc::new(MockSafety {
563 can_tweet: true,
564 can_thread: true,
565 }),
566 Arc::new(MockStorage::new(None)),
567 poster.clone(),
568 make_topics(),
569 604800,
570 false,
571 );
572
573 let result = loop_.run_once(Some("Rust"), Some(20)).await;
575 assert!(matches!(result, ThreadResult::Posted { .. }));
576 }
577
578 #[tokio::test]
579 async fn run_iteration_rate_limited() {
580 let now = chrono::Utc::now();
581 let last_thread = now - chrono::Duration::days(8);
582 let storage = Arc::new(MockStorage::new(Some(last_thread)));
583
584 let loop_ = ThreadLoop::new(
585 Arc::new(MockThreadGenerator {
586 tweets: make_thread_tweets(),
587 }),
588 Arc::new(MockSafety {
589 can_tweet: true,
590 can_thread: false, }),
592 storage,
593 Arc::new(MockPoster::new()),
594 make_topics(),
595 604800,
596 false,
597 );
598
599 let mut recent = Vec::new();
600 let mut rng = rand::rng();
601 let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
602 assert!(matches!(result, ThreadResult::RateLimited));
603 }
604
605 #[tokio::test]
606 async fn run_iteration_posts_when_no_previous_thread() {
607 let storage = Arc::new(MockStorage::new(None)); let poster = Arc::new(MockPoster::new());
609
610 let loop_ = ThreadLoop::new(
611 Arc::new(MockThreadGenerator {
612 tweets: make_thread_tweets(),
613 }),
614 Arc::new(MockSafety {
615 can_tweet: true,
616 can_thread: true,
617 }),
618 storage,
619 poster.clone(),
620 make_topics(),
621 604800,
622 false,
623 );
624
625 let mut recent = Vec::new();
626 let mut rng = rand::rng();
627 let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
628 assert!(matches!(result, ThreadResult::Posted { .. }));
629 assert_eq!(recent.len(), 1);
630 }
631
632 #[tokio::test]
633 async fn run_iteration_caps_recent_topics() {
634 let now = chrono::Utc::now();
635 let last_thread = now - chrono::Duration::days(8);
636 let storage = Arc::new(MockStorage::new(Some(last_thread)));
637 let poster = Arc::new(MockPoster::new());
638
639 let loop_ = ThreadLoop::new(
640 Arc::new(MockThreadGenerator {
641 tweets: make_thread_tweets(),
642 }),
643 Arc::new(MockSafety {
644 can_tweet: true,
645 can_thread: true,
646 }),
647 storage,
648 poster,
649 make_topics(),
650 604800,
651 false,
652 );
653
654 let mut recent = vec!["A".to_string(), "B".to_string(), "C".to_string()];
655 let max_recent = 3;
656 let mut rng = rand::rng();
657 let result = loop_.run_iteration(&mut recent, max_recent, &mut rng).await;
658 if matches!(result, ThreadResult::Posted { .. }) {
659 assert_eq!(recent.len(), max_recent);
660 }
661 }
662
663 #[test]
664 fn log_thread_result_dry_run_true() {
665 ThreadLoop::log_thread_result(
667 &ThreadResult::Posted {
668 topic: "Rust".to_string(),
669 tweet_count: 5,
670 thread_id: "t2".to_string(),
671 },
672 true,
673 );
674 }
675
676 #[tokio::test]
677 async fn run_once_random_topic() {
678 let poster = Arc::new(MockPoster::new());
679 let loop_ = ThreadLoop::new(
680 Arc::new(MockThreadGenerator {
681 tweets: make_thread_tweets(),
682 }),
683 Arc::new(MockSafety {
684 can_tweet: true,
685 can_thread: true,
686 }),
687 Arc::new(MockStorage::new(None)),
688 poster,
689 make_topics(),
690 604800,
691 false,
692 );
693
694 let result = loop_.run_once(None, None).await;
695 assert!(matches!(result, ThreadResult::Posted { .. }));
696 if let ThreadResult::Posted { topic, .. } = result {
697 assert!(make_topics().contains(&topic));
698 }
699 }
700}