Skip to main content

tuitbot_core/automation/content_loop/
scheduler.rs

1//! Run loop, iteration logic, and slot/interval scheduling.
2//!
3//! Implements the `run`, `run_once`, `run_iteration`, `run_slot_iteration`,
4//! and `log_content_result` methods on [`ContentLoop`].
5
6use super::super::schedule::{apply_slot_jitter, schedule_gate, ActiveSchedule};
7use super::super::scheduler::LoopScheduler;
8use super::{ContentLoop, ContentResult};
9use rand::seq::IndexedRandom;
10use rand::SeedableRng;
11use std::sync::Arc;
12use tokio_util::sync::CancellationToken;
13
14impl ContentLoop {
15    /// Run the continuous content loop until cancellation.
16    pub async fn run(
17        &self,
18        cancel: CancellationToken,
19        scheduler: LoopScheduler,
20        schedule: Option<Arc<ActiveSchedule>>,
21    ) {
22        let slot_mode = schedule.as_ref().is_some_and(|s| s.has_preferred_times());
23
24        tracing::info!(
25            dry_run = self.dry_run,
26            topics = self.topics.len(),
27            window_secs = self.post_window_secs,
28            slot_mode = slot_mode,
29            "Content loop started"
30        );
31
32        if self.topics.is_empty() {
33            tracing::warn!("No topics configured, content loop has nothing to post");
34            cancel.cancelled().await;
35            return;
36        }
37
38        let min_recent = 3usize;
39        let max_recent = (self.topics.len() / 2)
40            .max(min_recent)
41            .min(self.topics.len());
42        let mut recent_topics: Vec<String> = Vec::with_capacity(max_recent);
43        let mut rng = rand::rngs::StdRng::from_rng(&mut rand::rng());
44
45        loop {
46            if cancel.is_cancelled() {
47                break;
48            }
49
50            if !schedule_gate(&schedule, &cancel).await {
51                break;
52            }
53
54            if slot_mode {
55                // Slot-based scheduling: post at preferred times
56                let sched = schedule.as_ref().expect("slot_mode requires schedule");
57
58                // Query today's post times from storage
59                let today_posts = match self.storage.todays_tweet_times().await {
60                    Ok(times) => times,
61                    Err(e) => {
62                        tracing::warn!(error = %e, "Failed to query today's tweet times");
63                        Vec::new()
64                    }
65                };
66
67                match sched.next_unused_slot(&today_posts) {
68                    Some((wait, slot)) => {
69                        let jittered_wait = apply_slot_jitter(wait);
70                        tracing::info!(
71                            slot = %slot.format(),
72                            wait_secs = jittered_wait.as_secs(),
73                            "Slot mode: sleeping until next posting slot"
74                        );
75
76                        tokio::select! {
77                            _ = cancel.cancelled() => break,
78                            _ = tokio::time::sleep(jittered_wait) => {},
79                        }
80
81                        if cancel.is_cancelled() {
82                            break;
83                        }
84
85                        // In slot mode, skip the elapsed-time check — post directly
86                        let result = self
87                            .run_slot_iteration(&mut recent_topics, max_recent, &mut rng)
88                            .await;
89                        self.log_content_result(&result);
90                    }
91                    None => {
92                        // All slots used today — sleep until next active day
93                        tracing::info!(
94                            "Slot mode: all slots used today, sleeping until next active period"
95                        );
96                        let wait = sched.time_until_active();
97                        if wait.is_zero() {
98                            // Currently active but all slots used — sleep 1 hour and recheck
99                            tokio::select! {
100                                _ = cancel.cancelled() => break,
101                                _ = tokio::time::sleep(std::time::Duration::from_secs(3600)) => {},
102                            }
103                        } else {
104                            tokio::select! {
105                                _ = cancel.cancelled() => break,
106                                _ = tokio::time::sleep(wait) => {},
107                            }
108                        }
109                    }
110                }
111            } else {
112                // Interval-based scheduling (existing behavior)
113                let result = self
114                    .run_iteration(&mut recent_topics, max_recent, &mut rng)
115                    .await;
116                self.log_content_result(&result);
117
118                tokio::select! {
119                    _ = cancel.cancelled() => break,
120                    _ = scheduler.tick() => {},
121                }
122            }
123        }
124
125        tracing::info!("Content loop stopped");
126    }
127
128    /// Log the result of a content iteration.
129    pub(super) fn log_content_result(&self, result: &ContentResult) {
130        match result {
131            ContentResult::Posted { topic, content } => {
132                tracing::info!(
133                    topic = %topic,
134                    chars = content.len(),
135                    dry_run = self.dry_run,
136                    "Content iteration: tweet posted"
137                );
138            }
139            ContentResult::TooSoon {
140                elapsed_secs,
141                window_secs,
142            } => {
143                tracing::debug!(
144                    elapsed = elapsed_secs,
145                    window = window_secs,
146                    "Content iteration: too soon since last tweet"
147                );
148            }
149            ContentResult::RateLimited => {
150                tracing::info!("Content iteration: daily tweet limit reached");
151            }
152            ContentResult::NoTopics => {
153                tracing::warn!("Content iteration: no topics available");
154            }
155            ContentResult::Failed { error } => {
156                tracing::warn!(error = %error, "Content iteration: failed");
157            }
158        }
159    }
160
161    /// Run a single iteration in slot mode (skips elapsed-time check).
162    pub(super) async fn run_slot_iteration(
163        &self,
164        recent_topics: &mut Vec<String>,
165        max_recent: usize,
166        rng: &mut impl rand::Rng,
167    ) -> ContentResult {
168        // Check for manually scheduled content due for posting
169        if let Some(result) = self.try_post_scheduled().await {
170            return result;
171        }
172
173        // Check safety (daily tweet limit)
174        if !self.safety.can_post_tweet().await {
175            return ContentResult::RateLimited;
176        }
177
178        // Pick a topic using epsilon-greedy if scorer is available
179        let topic = self.pick_topic_epsilon_greedy(recent_topics, rng).await;
180
181        let result = self.generate_and_post(&topic).await;
182
183        // Update recent_topics on success
184        if matches!(result, ContentResult::Posted { .. }) {
185            if recent_topics.len() >= max_recent {
186                recent_topics.remove(0);
187            }
188            recent_topics.push(topic);
189        }
190
191        result
192    }
193
194    /// Run a single content generation (for CLI `tuitbot post` command).
195    ///
196    /// If `topic` is provided, uses that topic. Otherwise picks a random
197    /// topic from the configured list.
198    pub async fn run_once(&self, topic: Option<&str>) -> ContentResult {
199        let chosen_topic = match topic {
200            Some(t) => t.to_string(),
201            None => {
202                if self.topics.is_empty() {
203                    return ContentResult::NoTopics;
204                }
205                let mut rng = rand::rng();
206                self.topics
207                    .choose(&mut rng)
208                    .expect("topics is non-empty")
209                    .clone()
210            }
211        };
212
213        // Skip window check for single-shot mode, but still check safety
214        if !self.safety.can_post_tweet().await {
215            return ContentResult::RateLimited;
216        }
217
218        self.generate_and_post(&chosen_topic).await
219    }
220
221    /// Run a single iteration of the continuous loop.
222    pub(super) async fn run_iteration(
223        &self,
224        recent_topics: &mut Vec<String>,
225        max_recent: usize,
226        rng: &mut impl rand::Rng,
227    ) -> ContentResult {
228        // Check for manually scheduled content due for posting
229        if let Some(result) = self.try_post_scheduled().await {
230            return result;
231        }
232
233        // Check elapsed time since last tweet
234        match self.storage.last_tweet_time().await {
235            Ok(Some(last_time)) => {
236                let elapsed = chrono::Utc::now()
237                    .signed_duration_since(last_time)
238                    .num_seconds()
239                    .max(0) as u64;
240
241                if elapsed < self.post_window_secs {
242                    return ContentResult::TooSoon {
243                        elapsed_secs: elapsed,
244                        window_secs: self.post_window_secs,
245                    };
246                }
247            }
248            Ok(None) => {
249                // No previous tweets -- proceed
250            }
251            Err(e) => {
252                tracing::warn!(error = %e, "Failed to query last tweet time, proceeding anyway");
253            }
254        }
255
256        // Check safety (daily tweet limit)
257        if !self.safety.can_post_tweet().await {
258            return ContentResult::RateLimited;
259        }
260
261        // Pick a topic using epsilon-greedy if scorer is available
262        let topic = self.pick_topic_epsilon_greedy(recent_topics, rng).await;
263
264        let result = self.generate_and_post(&topic).await;
265
266        // Update recent_topics on success
267        if matches!(result, ContentResult::Posted { .. }) {
268            if recent_topics.len() >= max_recent {
269                recent_topics.remove(0);
270            }
271            recent_topics.push(topic);
272        }
273
274        result
275    }
276}
277
278// ---------------------------------------------------------------------------
279// Tests
280// ---------------------------------------------------------------------------
281
282#[cfg(test)]
283mod tests {
284    use super::super::test_mocks::{make_topics, MockGenerator, MockSafety, MockStorage};
285    use super::super::{ContentLoop, ContentResult};
286    use std::sync::Arc;
287
288    #[tokio::test]
289    async fn run_once_posts_tweet() {
290        let storage = Arc::new(MockStorage::new(None));
291        let content = ContentLoop::new(
292            Arc::new(MockGenerator {
293                response: "Great tweet about Rust!".to_string(),
294            }),
295            Arc::new(MockSafety {
296                can_tweet: true,
297                can_thread: true,
298            }),
299            storage.clone(),
300            make_topics(),
301            14400,
302            false,
303        );
304
305        let result = content.run_once(Some("Rust")).await;
306        assert!(matches!(result, ContentResult::Posted { .. }));
307        assert_eq!(storage.posted_count(), 1);
308    }
309
310    #[tokio::test]
311    async fn run_once_dry_run_does_not_post() {
312        let storage = Arc::new(MockStorage::new(None));
313        let content = ContentLoop::new(
314            Arc::new(MockGenerator {
315                response: "Great tweet about Rust!".to_string(),
316            }),
317            Arc::new(MockSafety {
318                can_tweet: true,
319                can_thread: true,
320            }),
321            storage.clone(),
322            make_topics(),
323            14400,
324            true,
325        );
326
327        let result = content.run_once(Some("Rust")).await;
328        assert!(matches!(result, ContentResult::Posted { .. }));
329        assert_eq!(storage.posted_count(), 0); // Not posted in dry-run
330        assert_eq!(storage.action_count(), 1); // Action logged
331    }
332
333    #[tokio::test]
334    async fn run_once_rate_limited() {
335        let content = ContentLoop::new(
336            Arc::new(MockGenerator {
337                response: "tweet".to_string(),
338            }),
339            Arc::new(MockSafety {
340                can_tweet: false,
341                can_thread: true,
342            }),
343            Arc::new(MockStorage::new(None)),
344            make_topics(),
345            14400,
346            false,
347        );
348
349        let result = content.run_once(None).await;
350        assert!(matches!(result, ContentResult::RateLimited));
351    }
352
353    #[tokio::test]
354    async fn run_once_no_topics_returns_no_topics() {
355        let content = ContentLoop::new(
356            Arc::new(MockGenerator {
357                response: "tweet".to_string(),
358            }),
359            Arc::new(MockSafety {
360                can_tweet: true,
361                can_thread: true,
362            }),
363            Arc::new(MockStorage::new(None)),
364            Vec::new(),
365            14400,
366            false,
367        );
368
369        let result = content.run_once(None).await;
370        assert!(matches!(result, ContentResult::NoTopics));
371    }
372
373    #[tokio::test]
374    async fn run_iteration_skips_when_too_soon() {
375        let now = chrono::Utc::now();
376        // Last tweet was 1 hour ago, window is 4 hours
377        let last_tweet = now - chrono::Duration::hours(1);
378        let storage = Arc::new(MockStorage::new(Some(last_tweet)));
379
380        let content = ContentLoop::new(
381            Arc::new(MockGenerator {
382                response: "tweet".to_string(),
383            }),
384            Arc::new(MockSafety {
385                can_tweet: true,
386                can_thread: true,
387            }),
388            storage,
389            make_topics(),
390            14400, // 4 hours
391            false,
392        );
393
394        let mut recent = Vec::new();
395        let mut rng = rand::rng();
396        let result = content.run_iteration(&mut recent, 3, &mut rng).await;
397        assert!(matches!(result, ContentResult::TooSoon { .. }));
398    }
399
400    #[tokio::test]
401    async fn run_iteration_posts_when_window_elapsed() {
402        let now = chrono::Utc::now();
403        // Last tweet was 5 hours ago, window is 4 hours
404        let last_tweet = now - chrono::Duration::hours(5);
405        let storage = Arc::new(MockStorage::new(Some(last_tweet)));
406
407        let content = ContentLoop::new(
408            Arc::new(MockGenerator {
409                response: "Fresh tweet!".to_string(),
410            }),
411            Arc::new(MockSafety {
412                can_tweet: true,
413                can_thread: true,
414            }),
415            storage.clone(),
416            make_topics(),
417            14400,
418            false,
419        );
420
421        let mut recent = Vec::new();
422        let mut rng = rand::rng();
423        let result = content.run_iteration(&mut recent, 3, &mut rng).await;
424        assert!(matches!(result, ContentResult::Posted { .. }));
425        assert_eq!(storage.posted_count(), 1);
426        assert_eq!(recent.len(), 1);
427    }
428
429    // ---------------------------------------------------------------------------
430    // run() loop — cancellation coverage
431    // ---------------------------------------------------------------------------
432
433    #[tokio::test]
434    async fn run_cancels_immediately_with_topics() {
435        // Pre-cancel the token: loop should see is_cancelled() == true and exit
436        // without doing any work. Covers lines: slot_mode setup, loop entry, break.
437        use crate::automation::scheduler::LoopScheduler;
438        use std::time::Duration;
439        use tokio_util::sync::CancellationToken;
440
441        let cancel = CancellationToken::new();
442        cancel.cancel(); // already cancelled before run() is called
443
444        let content = ContentLoop::new(
445            Arc::new(MockGenerator {
446                response: "tweet".to_string(),
447            }),
448            Arc::new(MockSafety {
449                can_tweet: true,
450                can_thread: true,
451            }),
452            Arc::new(MockStorage::new(None)),
453            make_topics(),
454            3600,
455            false,
456        );
457
458        let scheduler =
459            LoopScheduler::new(Duration::from_secs(3600), Duration::ZERO, Duration::ZERO);
460        // Should return immediately — no panic, no post
461        content.run(cancel, scheduler, None).await;
462    }
463
464    #[tokio::test]
465    async fn run_no_topics_exits_on_cancel() {
466        // Empty topics: run() logs a warning then awaits cancel.
467        // Pre-cancelling means cancel.cancelled().await resolves immediately.
468        use crate::automation::scheduler::LoopScheduler;
469        use std::time::Duration;
470        use tokio_util::sync::CancellationToken;
471
472        let cancel = CancellationToken::new();
473        cancel.cancel();
474
475        let content = ContentLoop::new(
476            Arc::new(MockGenerator {
477                response: "tweet".to_string(),
478            }),
479            Arc::new(MockSafety {
480                can_tweet: true,
481                can_thread: true,
482            }),
483            Arc::new(MockStorage::new(None)),
484            vec![], // no topics
485            3600,
486            false,
487        );
488
489        let scheduler = LoopScheduler::new(Duration::from_secs(1), Duration::ZERO, Duration::ZERO);
490        content.run(cancel, scheduler, None).await;
491    }
492
493    #[tokio::test]
494    async fn run_interval_mode_one_iteration_then_cancel() {
495        // Interval mode with a very short scheduler interval.
496        // The loop runs one iteration, then gets cancelled via a background task.
497        use crate::automation::scheduler::LoopScheduler;
498        use std::time::Duration;
499        use tokio_util::sync::CancellationToken;
500
501        let cancel = CancellationToken::new();
502        let cancel_clone = cancel.clone();
503
504        let content = ContentLoop::new(
505            Arc::new(MockGenerator {
506                response: "interval tweet".to_string(),
507            }),
508            Arc::new(MockSafety {
509                can_tweet: true,
510                can_thread: true,
511            }),
512            Arc::new(MockStorage::new(None)),
513            make_topics(),
514            0, // post_window_secs=0 → always elapsed
515            false,
516        );
517
518        // Scheduler with 1ms interval so tick() resolves immediately
519        let scheduler =
520            LoopScheduler::new(Duration::from_millis(1), Duration::ZERO, Duration::ZERO);
521
522        // Cancel after 50ms to let one iteration complete
523        tokio::spawn(async move {
524            tokio::time::sleep(Duration::from_millis(50)).await;
525            cancel_clone.cancel();
526        });
527
528        tokio::time::timeout(Duration::from_secs(5), content.run(cancel, scheduler, None))
529            .await
530            .expect("run() should complete within timeout");
531    }
532
533    // ---------------------------------------------------------------------------
534    // log_content_result — all arms
535    // ---------------------------------------------------------------------------
536
537    #[tokio::test]
538    async fn log_content_result_all_variants() {
539        let content = ContentLoop::new(
540            Arc::new(MockGenerator {
541                response: "t".to_string(),
542            }),
543            Arc::new(MockSafety {
544                can_tweet: true,
545                can_thread: true,
546            }),
547            Arc::new(MockStorage::new(None)),
548            make_topics(),
549            3600,
550            false,
551        );
552
553        // Exercise every arm — no panics, no assertions needed (these are tracing calls)
554        content.log_content_result(&ContentResult::Posted {
555            topic: "Rust".to_string(),
556            content: "hello".to_string(),
557        });
558        content.log_content_result(&ContentResult::TooSoon {
559            elapsed_secs: 10,
560            window_secs: 3600,
561        });
562        content.log_content_result(&ContentResult::RateLimited);
563        content.log_content_result(&ContentResult::NoTopics);
564        content.log_content_result(&ContentResult::Failed {
565            error: "oops".to_string(),
566        });
567    }
568
569    // -----------------------------------------------------------------------
570    // Additional scheduler coverage tests
571    // -----------------------------------------------------------------------
572
573    #[tokio::test]
574    async fn run_once_with_specific_topic() {
575        let storage = Arc::new(MockStorage::new(None));
576        let content = ContentLoop::new(
577            Arc::new(MockGenerator {
578                response: "Topic-specific tweet".to_string(),
579            }),
580            Arc::new(MockSafety {
581                can_tweet: true,
582                can_thread: true,
583            }),
584            storage.clone(),
585            make_topics(),
586            14400,
587            false,
588        );
589
590        let result = content.run_once(Some("CLI tools")).await;
591        assert!(matches!(result, ContentResult::Posted { .. }));
592        if let ContentResult::Posted { topic, .. } = result {
593            assert_eq!(topic, "CLI tools");
594        }
595    }
596
597    #[tokio::test]
598    async fn run_once_random_topic_when_none() {
599        let storage = Arc::new(MockStorage::new(None));
600        let content = ContentLoop::new(
601            Arc::new(MockGenerator {
602                response: "Random topic tweet".to_string(),
603            }),
604            Arc::new(MockSafety {
605                can_tweet: true,
606                can_thread: true,
607            }),
608            storage.clone(),
609            make_topics(),
610            14400,
611            false,
612        );
613
614        let result = content.run_once(None).await;
615        assert!(matches!(result, ContentResult::Posted { .. }));
616    }
617
618    #[tokio::test]
619    async fn run_iteration_posts_when_no_previous_tweet() {
620        let storage = Arc::new(MockStorage::new(None)); // No last tweet
621
622        let content = ContentLoop::new(
623            Arc::new(MockGenerator {
624                response: "First ever tweet!".to_string(),
625            }),
626            Arc::new(MockSafety {
627                can_tweet: true,
628                can_thread: true,
629            }),
630            storage.clone(),
631            make_topics(),
632            14400,
633            false,
634        );
635
636        let mut recent = Vec::new();
637        let mut rng = rand::rng();
638        let result = content.run_iteration(&mut recent, 3, &mut rng).await;
639        assert!(matches!(result, ContentResult::Posted { .. }));
640        assert_eq!(storage.posted_count(), 1);
641    }
642
643    #[tokio::test]
644    async fn run_iteration_rate_limited() {
645        let now = chrono::Utc::now();
646        let last_tweet = now - chrono::Duration::hours(5);
647        let storage = Arc::new(MockStorage::new(Some(last_tweet)));
648
649        let content = ContentLoop::new(
650            Arc::new(MockGenerator {
651                response: "tweet".to_string(),
652            }),
653            Arc::new(MockSafety {
654                can_tweet: false, // rate limited
655                can_thread: true,
656            }),
657            storage,
658            make_topics(),
659            14400,
660            false,
661        );
662
663        let mut recent = Vec::new();
664        let mut rng = rand::rng();
665        let result = content.run_iteration(&mut recent, 3, &mut rng).await;
666        assert!(matches!(result, ContentResult::RateLimited));
667    }
668
669    #[tokio::test]
670    async fn run_slot_iteration_rate_limited() {
671        let storage = Arc::new(MockStorage::new(None));
672
673        let content = ContentLoop::new(
674            Arc::new(MockGenerator {
675                response: "tweet".to_string(),
676            }),
677            Arc::new(MockSafety {
678                can_tweet: false, // rate limited
679                can_thread: true,
680            }),
681            storage,
682            make_topics(),
683            14400,
684            false,
685        );
686
687        let mut recent = Vec::new();
688        let mut rng = rand::rng();
689        let result = content.run_slot_iteration(&mut recent, 3, &mut rng).await;
690        assert!(matches!(result, ContentResult::RateLimited));
691    }
692
693    #[tokio::test]
694    async fn run_slot_iteration_success_updates_recent() {
695        let storage = Arc::new(MockStorage::new(None));
696
697        let content = ContentLoop::new(
698            Arc::new(MockGenerator {
699                response: "Slot tweet!".to_string(),
700            }),
701            Arc::new(MockSafety {
702                can_tweet: true,
703                can_thread: true,
704            }),
705            storage.clone(),
706            make_topics(),
707            14400,
708            false,
709        );
710
711        let mut recent = Vec::new();
712        let mut rng = rand::rng();
713        let result = content.run_slot_iteration(&mut recent, 3, &mut rng).await;
714        assert!(matches!(result, ContentResult::Posted { .. }));
715        assert_eq!(recent.len(), 1);
716    }
717
718    #[tokio::test]
719    async fn run_slot_iteration_caps_recent_topics() {
720        let storage = Arc::new(MockStorage::new(None));
721
722        let content = ContentLoop::new(
723            Arc::new(MockGenerator {
724                response: "tweet".to_string(),
725            }),
726            Arc::new(MockSafety {
727                can_tweet: true,
728                can_thread: true,
729            }),
730            storage,
731            make_topics(),
732            14400,
733            false,
734        );
735
736        let mut recent = vec!["A".to_string(), "B".to_string(), "C".to_string()];
737        let max_recent = 3;
738        let mut rng = rand::rng();
739        let result = content
740            .run_slot_iteration(&mut recent, max_recent, &mut rng)
741            .await;
742        if matches!(result, ContentResult::Posted { .. }) {
743            // Recent should have removed oldest and added new
744            assert_eq!(recent.len(), max_recent);
745        }
746    }
747
748    #[tokio::test]
749    async fn run_iteration_updates_recent_on_success() {
750        let now = chrono::Utc::now();
751        let last_tweet = now - chrono::Duration::hours(5);
752        let storage = Arc::new(MockStorage::new(Some(last_tweet)));
753
754        let content = ContentLoop::new(
755            Arc::new(MockGenerator {
756                response: "tweet".to_string(),
757            }),
758            Arc::new(MockSafety {
759                can_tweet: true,
760                can_thread: true,
761            }),
762            storage,
763            make_topics(),
764            14400,
765            false,
766        );
767
768        let mut recent = Vec::new();
769        let mut rng = rand::rng();
770        let result = content.run_iteration(&mut recent, 3, &mut rng).await;
771        assert!(matches!(result, ContentResult::Posted { .. }));
772        assert_eq!(recent.len(), 1);
773    }
774}