Skip to main content

tuitbot_core/automation/thread_loop/
planner.rs

1//! Run loop, scheduling, iteration, and topic selection.
2//!
3//! Implements `run`, `run_once`, `run_iteration`, and `log_thread_result`
4//! on [`ThreadLoop`].
5
6use super::super::schedule::{apply_slot_jitter, schedule_gate, ActiveSchedule};
7use super::super::scheduler::LoopScheduler;
8use super::{ThreadLoop, ThreadResult};
9use rand::seq::IndexedRandom;
10use rand::SeedableRng;
11use std::sync::Arc;
12use std::time::Duration;
13use tokio_util::sync::CancellationToken;
14
15impl ThreadLoop {
16    /// Run the continuous thread loop until cancellation.
17    pub async fn run(
18        &self,
19        cancel: CancellationToken,
20        scheduler: LoopScheduler,
21        schedule: Option<Arc<ActiveSchedule>>,
22    ) {
23        let slot_mode = schedule
24            .as_ref()
25            .is_some_and(|s| s.has_thread_preferred_schedule());
26
27        tracing::info!(
28            dry_run = self.dry_run,
29            topics = self.topics.len(),
30            thread_interval_secs = self.thread_interval_secs,
31            slot_mode = slot_mode,
32            "Thread loop started"
33        );
34
35        if self.topics.is_empty() {
36            tracing::warn!("No topics configured, thread loop has nothing to post");
37            cancel.cancelled().await;
38            return;
39        }
40
41        let min_recent = 3usize;
42        let max_recent = (self.topics.len() / 2)
43            .max(min_recent)
44            .min(self.topics.len());
45        let mut recent_topics: Vec<String> = Vec::with_capacity(max_recent);
46        let mut rng = rand::rngs::StdRng::from_rng(&mut rand::rng());
47
48        loop {
49            if cancel.is_cancelled() {
50                break;
51            }
52
53            if !schedule_gate(&schedule, &cancel).await {
54                break;
55            }
56
57            if slot_mode {
58                let sched = schedule.as_ref().expect("slot_mode requires schedule");
59
60                match sched.next_thread_slot() {
61                    Some(wait) => {
62                        let jittered_wait = apply_slot_jitter(wait);
63                        tracing::info!(
64                            wait_secs = jittered_wait.as_secs(),
65                            "Thread slot mode: sleeping until preferred thread time"
66                        );
67
68                        tokio::select! {
69                            _ = cancel.cancelled() => break,
70                            _ = tokio::time::sleep(jittered_wait) => {},
71                        }
72
73                        if cancel.is_cancelled() {
74                            break;
75                        }
76
77                        if !self.safety.can_post_thread().await {
78                            Self::log_thread_result(&ThreadResult::RateLimited, self.dry_run);
79                            continue;
80                        }
81
82                        let topic = super::pick_topic(&self.topics, &mut recent_topics, &mut rng);
83                        let result = self.generate_and_post(&topic, None).await;
84
85                        if matches!(result, ThreadResult::Posted { .. }) {
86                            if recent_topics.len() >= max_recent {
87                                recent_topics.remove(0);
88                            }
89                            recent_topics.push(topic);
90                        }
91
92                        Self::log_thread_result(&result, self.dry_run);
93                    }
94                    None => {
95                        tracing::warn!("Thread slot mode: no next slot found, sleeping 1 hour");
96                        tokio::select! {
97                            _ = cancel.cancelled() => break,
98                            _ = tokio::time::sleep(Duration::from_secs(3600)) => {},
99                        }
100                    }
101                }
102            } else {
103                let result = self
104                    .run_iteration(&mut recent_topics, max_recent, &mut rng)
105                    .await;
106                Self::log_thread_result(&result, self.dry_run);
107
108                tokio::select! {
109                    _ = cancel.cancelled() => break,
110                    _ = scheduler.tick() => {},
111                }
112            }
113        }
114
115        tracing::info!("Thread loop stopped");
116    }
117
118    /// Log the result of a thread iteration.
119    pub(super) fn log_thread_result(result: &ThreadResult, dry_run: bool) {
120        match result {
121            ThreadResult::Posted {
122                topic, tweet_count, ..
123            } => {
124                tracing::info!(
125                    topic = %topic,
126                    tweets = tweet_count,
127                    dry_run = dry_run,
128                    "Thread iteration: thread posted"
129                );
130            }
131            ThreadResult::PartialFailure {
132                tweets_posted,
133                total_tweets,
134                error,
135                ..
136            } => {
137                tracing::warn!(
138                    posted = tweets_posted,
139                    total = total_tweets,
140                    error = %error,
141                    "Thread iteration: partial failure"
142                );
143            }
144            ThreadResult::TooSoon { .. } => {
145                tracing::debug!("Thread iteration: too soon since last thread");
146            }
147            ThreadResult::RateLimited => {
148                tracing::info!("Thread iteration: weekly thread limit reached");
149            }
150            ThreadResult::NoTopics => {
151                tracing::warn!("Thread iteration: no topics available");
152            }
153            ThreadResult::ValidationFailed { error } => {
154                tracing::warn!(error = %error, "Thread iteration: validation failed");
155            }
156            ThreadResult::Failed { error } => {
157                tracing::warn!(error = %error, "Thread iteration: failed");
158            }
159        }
160    }
161
162    /// Run a single thread generation (for CLI `tuitbot thread` command).
163    ///
164    /// If `topic` is provided, uses that topic. Otherwise picks a random one.
165    /// If `count` is provided, generates exactly that many tweets (clamped 2-15).
166    pub async fn run_once(&self, topic: Option<&str>, count: Option<usize>) -> ThreadResult {
167        let chosen_topic = match topic {
168            Some(t) => t.to_string(),
169            None => {
170                if self.topics.is_empty() {
171                    return ThreadResult::NoTopics;
172                }
173                let mut rng = rand::rng();
174                self.topics
175                    .choose(&mut rng)
176                    .expect("topics is non-empty")
177                    .clone()
178            }
179        };
180
181        let clamped_count = count.map(|c| c.clamp(2, 15));
182
183        if !self.safety.can_post_thread().await {
184            return ThreadResult::RateLimited;
185        }
186
187        self.generate_and_post(&chosen_topic, clamped_count).await
188    }
189
190    /// Run a single iteration of the continuous loop.
191    pub(super) async fn run_iteration(
192        &self,
193        recent_topics: &mut Vec<String>,
194        max_recent: usize,
195        rng: &mut impl rand::Rng,
196    ) -> ThreadResult {
197        match self.storage.last_thread_time().await {
198            Ok(Some(last_time)) => {
199                let elapsed = chrono::Utc::now()
200                    .signed_duration_since(last_time)
201                    .num_seconds()
202                    .max(0) as u64;
203
204                if elapsed < self.thread_interval_secs {
205                    return ThreadResult::TooSoon {
206                        elapsed_secs: elapsed,
207                        interval_secs: self.thread_interval_secs,
208                    };
209                }
210            }
211            Ok(None) => {}
212            Err(e) => {
213                tracing::warn!(error = %e, "Failed to query last thread time, proceeding anyway");
214            }
215        }
216
217        if !self.safety.can_post_thread().await {
218            return ThreadResult::RateLimited;
219        }
220
221        let topic = super::pick_topic(&self.topics, recent_topics, rng);
222
223        let result = self.generate_and_post(&topic, None).await;
224
225        if matches!(result, ThreadResult::Posted { .. }) {
226            if recent_topics.len() >= max_recent {
227                recent_topics.remove(0);
228            }
229            recent_topics.push(topic);
230        }
231
232        result
233    }
234}
235
236// ---------------------------------------------------------------------------
237// Tests
238// ---------------------------------------------------------------------------
239
240#[cfg(test)]
241mod tests {
242    use super::super::test_mocks::{
243        make_thread_tweets, make_topics, MockPoster, MockSafety, MockStorage, MockThreadGenerator,
244    };
245    use super::super::{ThreadLoop, ThreadResult};
246    use std::sync::Arc;
247
248    #[tokio::test]
249    async fn run_once_rate_limited() {
250        let loop_ = ThreadLoop::new(
251            Arc::new(MockThreadGenerator {
252                tweets: make_thread_tweets(),
253            }),
254            Arc::new(MockSafety {
255                can_tweet: true,
256                can_thread: false,
257            }),
258            Arc::new(MockStorage::new(None)),
259            Arc::new(MockPoster::new()),
260            make_topics(),
261            604800,
262            false,
263        );
264
265        let result = loop_.run_once(None, None).await;
266        assert!(matches!(result, ThreadResult::RateLimited));
267    }
268
269    #[tokio::test]
270    async fn run_once_no_topics() {
271        let loop_ = ThreadLoop::new(
272            Arc::new(MockThreadGenerator {
273                tweets: make_thread_tweets(),
274            }),
275            Arc::new(MockSafety {
276                can_tweet: true,
277                can_thread: true,
278            }),
279            Arc::new(MockStorage::new(None)),
280            Arc::new(MockPoster::new()),
281            Vec::new(),
282            604800,
283            false,
284        );
285
286        let result = loop_.run_once(None, None).await;
287        assert!(matches!(result, ThreadResult::NoTopics));
288    }
289
290    #[tokio::test]
291    async fn run_once_clamps_count() {
292        let poster = Arc::new(MockPoster::new());
293        let tweets = vec![
294            "Tweet 1".to_string(),
295            "Tweet 2".to_string(),
296            "Tweet 3".to_string(),
297        ];
298
299        let loop_ = ThreadLoop::new(
300            Arc::new(MockThreadGenerator { tweets }),
301            Arc::new(MockSafety {
302                can_tweet: true,
303                can_thread: true,
304            }),
305            Arc::new(MockStorage::new(None)),
306            poster.clone(),
307            make_topics(),
308            604800,
309            false,
310        );
311
312        // count=1 clamped to 2; mock ignores count but result should be Posted
313        let result = loop_.run_once(Some("Rust"), Some(1)).await;
314        assert!(matches!(result, ThreadResult::Posted { .. }));
315    }
316
317    #[tokio::test]
318    async fn run_iteration_skips_when_too_soon() {
319        let now = chrono::Utc::now();
320        let last_thread = now - chrono::Duration::days(3);
321        let storage = Arc::new(MockStorage::new(Some(last_thread)));
322
323        let loop_ = ThreadLoop::new(
324            Arc::new(MockThreadGenerator {
325                tweets: make_thread_tweets(),
326            }),
327            Arc::new(MockSafety {
328                can_tweet: true,
329                can_thread: true,
330            }),
331            storage,
332            Arc::new(MockPoster::new()),
333            make_topics(),
334            604800, // 7 days
335            false,
336        );
337
338        let mut recent = Vec::new();
339        let mut rng = rand::rng();
340        let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
341        assert!(matches!(result, ThreadResult::TooSoon { .. }));
342    }
343
344    #[tokio::test]
345    async fn run_iteration_posts_when_interval_elapsed() {
346        let now = chrono::Utc::now();
347        let last_thread = now - chrono::Duration::days(8);
348        let storage = Arc::new(MockStorage::new(Some(last_thread)));
349        let poster = Arc::new(MockPoster::new());
350
351        let loop_ = ThreadLoop::new(
352            Arc::new(MockThreadGenerator {
353                tweets: make_thread_tweets(),
354            }),
355            Arc::new(MockSafety {
356                can_tweet: true,
357                can_thread: true,
358            }),
359            storage,
360            poster.clone(),
361            make_topics(),
362            604800, // 7 days
363            false,
364        );
365
366        let mut recent = Vec::new();
367        let mut rng = rand::rng();
368        let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
369        assert!(matches!(result, ThreadResult::Posted { .. }));
370        assert_eq!(poster.posted_count(), 5);
371        assert_eq!(recent.len(), 1);
372    }
373
374    // -------------------------------------------------------------------------
375    // run() loop — cancellation coverage
376    // -------------------------------------------------------------------------
377
378    #[tokio::test]
379    async fn run_cancels_immediately_with_topics() {
380        // Pre-cancel: loop sees is_cancelled() == true and exits without posting.
381        use crate::automation::scheduler::LoopScheduler;
382        use std::time::Duration;
383        use tokio_util::sync::CancellationToken;
384
385        let cancel = CancellationToken::new();
386        cancel.cancel();
387
388        let loop_ = ThreadLoop::new(
389            Arc::new(MockThreadGenerator {
390                tweets: make_thread_tweets(),
391            }),
392            Arc::new(MockSafety {
393                can_tweet: true,
394                can_thread: true,
395            }),
396            Arc::new(MockStorage::new(None)),
397            Arc::new(MockPoster::new()),
398            make_topics(),
399            604800,
400            false,
401        );
402
403        let scheduler =
404            LoopScheduler::new(Duration::from_secs(3600), Duration::ZERO, Duration::ZERO);
405        loop_.run(cancel, scheduler, None).await;
406    }
407
408    #[tokio::test]
409    async fn run_no_topics_exits_on_cancel() {
410        // Empty topics: awaits cancel immediately.
411        use crate::automation::scheduler::LoopScheduler;
412        use std::time::Duration;
413        use tokio_util::sync::CancellationToken;
414
415        let cancel = CancellationToken::new();
416        cancel.cancel();
417
418        let loop_ = ThreadLoop::new(
419            Arc::new(MockThreadGenerator {
420                tweets: make_thread_tweets(),
421            }),
422            Arc::new(MockSafety {
423                can_tweet: true,
424                can_thread: true,
425            }),
426            Arc::new(MockStorage::new(None)),
427            Arc::new(MockPoster::new()),
428            vec![], // no topics
429            604800,
430            false,
431        );
432
433        let scheduler = LoopScheduler::new(Duration::from_secs(1), Duration::ZERO, Duration::ZERO);
434        loop_.run(cancel, scheduler, None).await;
435    }
436
437    #[tokio::test]
438    async fn run_interval_mode_one_iteration_then_cancel() {
439        // Interval mode: one fast iteration (TooSoon), then cancel fires.
440        // Using large interval so run_iteration returns TooSoon quickly.
441        use crate::automation::scheduler::LoopScheduler;
442        use std::time::Duration;
443        use tokio_util::sync::CancellationToken;
444
445        let cancel = CancellationToken::new();
446        let cancel_clone = cancel.clone();
447
448        // last_thread = now → elapsed is ~0 → with interval=999999 → TooSoon
449        let storage = Arc::new(MockStorage::new(Some(chrono::Utc::now())));
450
451        let loop_ = ThreadLoop::new(
452            Arc::new(MockThreadGenerator {
453                tweets: make_thread_tweets(),
454            }),
455            Arc::new(MockSafety {
456                can_tweet: true,
457                can_thread: true,
458            }),
459            storage,
460            Arc::new(MockPoster::new()),
461            make_topics(),
462            999999, // large interval → TooSoon every time
463            false,
464        );
465
466        let scheduler =
467            LoopScheduler::new(Duration::from_millis(1), Duration::ZERO, Duration::ZERO);
468
469        tokio::spawn(async move {
470            tokio::time::sleep(Duration::from_millis(50)).await;
471            cancel_clone.cancel();
472        });
473
474        tokio::time::timeout(Duration::from_secs(5), loop_.run(cancel, scheduler, None))
475            .await
476            .expect("run() should complete within timeout");
477    }
478
479    // -------------------------------------------------------------------------
480    // log_thread_result — all arms
481    // -------------------------------------------------------------------------
482
483    #[test]
484    fn log_thread_result_all_variants() {
485        // Exercises every match arm — no panics (tracing calls)
486        ThreadLoop::log_thread_result(
487            &ThreadResult::Posted {
488                topic: "Rust".to_string(),
489                tweet_count: 3,
490                thread_id: "t1".to_string(),
491            },
492            false,
493        );
494        ThreadLoop::log_thread_result(
495            &ThreadResult::PartialFailure {
496                topic: "Rust".to_string(),
497                tweets_posted: 2,
498                total_tweets: 5,
499                error: "network error".to_string(),
500            },
501            false,
502        );
503        ThreadLoop::log_thread_result(
504            &ThreadResult::TooSoon {
505                elapsed_secs: 10,
506                interval_secs: 604800,
507            },
508            false,
509        );
510        ThreadLoop::log_thread_result(&ThreadResult::RateLimited, false);
511        ThreadLoop::log_thread_result(&ThreadResult::NoTopics, false);
512        ThreadLoop::log_thread_result(
513            &ThreadResult::ValidationFailed {
514                error: "too long".to_string(),
515            },
516            false,
517        );
518        ThreadLoop::log_thread_result(
519            &ThreadResult::Failed {
520                error: "llm failed".to_string(),
521            },
522            false,
523        );
524    }
525
526    // -----------------------------------------------------------------------
527    // Additional thread planner coverage tests
528    // -----------------------------------------------------------------------
529
530    #[tokio::test]
531    async fn run_once_with_specific_topic() {
532        let poster = Arc::new(MockPoster::new());
533        let loop_ = ThreadLoop::new(
534            Arc::new(MockThreadGenerator {
535                tweets: make_thread_tweets(),
536            }),
537            Arc::new(MockSafety {
538                can_tweet: true,
539                can_thread: true,
540            }),
541            Arc::new(MockStorage::new(None)),
542            poster.clone(),
543            make_topics(),
544            604800,
545            false,
546        );
547
548        let result = loop_.run_once(Some("CLI tools"), None).await;
549        assert!(matches!(result, ThreadResult::Posted { .. }));
550        if let ThreadResult::Posted { topic, .. } = result {
551            assert_eq!(topic, "CLI tools");
552        }
553    }
554
555    #[tokio::test]
556    async fn run_once_with_custom_count() {
557        let poster = Arc::new(MockPoster::new());
558        let loop_ = ThreadLoop::new(
559            Arc::new(MockThreadGenerator {
560                tweets: make_thread_tweets(),
561            }),
562            Arc::new(MockSafety {
563                can_tweet: true,
564                can_thread: true,
565            }),
566            Arc::new(MockStorage::new(None)),
567            poster.clone(),
568            make_topics(),
569            604800,
570            false,
571        );
572
573        // count=20 should clamp to 15
574        let result = loop_.run_once(Some("Rust"), Some(20)).await;
575        assert!(matches!(result, ThreadResult::Posted { .. }));
576    }
577
578    #[tokio::test]
579    async fn run_iteration_rate_limited() {
580        let now = chrono::Utc::now();
581        let last_thread = now - chrono::Duration::days(8);
582        let storage = Arc::new(MockStorage::new(Some(last_thread)));
583
584        let loop_ = ThreadLoop::new(
585            Arc::new(MockThreadGenerator {
586                tweets: make_thread_tweets(),
587            }),
588            Arc::new(MockSafety {
589                can_tweet: true,
590                can_thread: false, // rate limited
591            }),
592            storage,
593            Arc::new(MockPoster::new()),
594            make_topics(),
595            604800,
596            false,
597        );
598
599        let mut recent = Vec::new();
600        let mut rng = rand::rng();
601        let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
602        assert!(matches!(result, ThreadResult::RateLimited));
603    }
604
605    #[tokio::test]
606    async fn run_iteration_posts_when_no_previous_thread() {
607        let storage = Arc::new(MockStorage::new(None)); // No last thread
608        let poster = Arc::new(MockPoster::new());
609
610        let loop_ = ThreadLoop::new(
611            Arc::new(MockThreadGenerator {
612                tweets: make_thread_tweets(),
613            }),
614            Arc::new(MockSafety {
615                can_tweet: true,
616                can_thread: true,
617            }),
618            storage,
619            poster.clone(),
620            make_topics(),
621            604800,
622            false,
623        );
624
625        let mut recent = Vec::new();
626        let mut rng = rand::rng();
627        let result = loop_.run_iteration(&mut recent, 3, &mut rng).await;
628        assert!(matches!(result, ThreadResult::Posted { .. }));
629        assert_eq!(recent.len(), 1);
630    }
631
632    #[tokio::test]
633    async fn run_iteration_caps_recent_topics() {
634        let now = chrono::Utc::now();
635        let last_thread = now - chrono::Duration::days(8);
636        let storage = Arc::new(MockStorage::new(Some(last_thread)));
637        let poster = Arc::new(MockPoster::new());
638
639        let loop_ = ThreadLoop::new(
640            Arc::new(MockThreadGenerator {
641                tweets: make_thread_tweets(),
642            }),
643            Arc::new(MockSafety {
644                can_tweet: true,
645                can_thread: true,
646            }),
647            storage,
648            poster,
649            make_topics(),
650            604800,
651            false,
652        );
653
654        let mut recent = vec!["A".to_string(), "B".to_string(), "C".to_string()];
655        let max_recent = 3;
656        let mut rng = rand::rng();
657        let result = loop_.run_iteration(&mut recent, max_recent, &mut rng).await;
658        if matches!(result, ThreadResult::Posted { .. }) {
659            assert_eq!(recent.len(), max_recent);
660        }
661    }
662
663    #[test]
664    fn log_thread_result_dry_run_true() {
665        // Verify dry_run flag doesn't cause panics
666        ThreadLoop::log_thread_result(
667            &ThreadResult::Posted {
668                topic: "Rust".to_string(),
669                tweet_count: 5,
670                thread_id: "t2".to_string(),
671            },
672            true,
673        );
674    }
675
676    #[tokio::test]
677    async fn run_once_random_topic() {
678        let poster = Arc::new(MockPoster::new());
679        let loop_ = ThreadLoop::new(
680            Arc::new(MockThreadGenerator {
681                tweets: make_thread_tweets(),
682            }),
683            Arc::new(MockSafety {
684                can_tweet: true,
685                can_thread: true,
686            }),
687            Arc::new(MockStorage::new(None)),
688            poster,
689            make_topics(),
690            604800,
691            false,
692        );
693
694        let result = loop_.run_once(None, None).await;
695        assert!(matches!(result, ThreadResult::Posted { .. }));
696        if let ThreadResult::Posted { topic, .. } = result {
697            assert!(make_topics().contains(&topic));
698        }
699    }
700}