1use super::super::schedule::{apply_slot_jitter, schedule_gate, ActiveSchedule};
7use super::super::scheduler::LoopScheduler;
8use super::{ContentLoop, ContentResult};
9use rand::seq::IndexedRandom;
10use rand::SeedableRng;
11use std::sync::Arc;
12use tokio_util::sync::CancellationToken;
13
14impl ContentLoop {
15 pub async fn run(
17 &self,
18 cancel: CancellationToken,
19 scheduler: LoopScheduler,
20 schedule: Option<Arc<ActiveSchedule>>,
21 ) {
22 let slot_mode = schedule.as_ref().is_some_and(|s| s.has_preferred_times());
23
24 tracing::info!(
25 dry_run = self.dry_run,
26 topics = self.topics.len(),
27 window_secs = self.post_window_secs,
28 slot_mode = slot_mode,
29 "Content loop started"
30 );
31
32 if self.topics.is_empty() {
33 tracing::warn!("No topics configured, content loop has nothing to post");
34 cancel.cancelled().await;
35 return;
36 }
37
38 let min_recent = 3usize;
39 let max_recent = (self.topics.len() / 2)
40 .max(min_recent)
41 .min(self.topics.len());
42 let mut recent_topics: Vec<String> = Vec::with_capacity(max_recent);
43 let mut rng = rand::rngs::StdRng::from_rng(&mut rand::rng());
44
45 loop {
46 if cancel.is_cancelled() {
47 break;
48 }
49
50 if !schedule_gate(&schedule, &cancel).await {
51 break;
52 }
53
54 if slot_mode {
55 let sched = schedule.as_ref().expect("slot_mode requires schedule");
57
58 let today_posts = match self.storage.todays_tweet_times().await {
60 Ok(times) => times,
61 Err(e) => {
62 tracing::warn!(error = %e, "Failed to query today's tweet times");
63 Vec::new()
64 }
65 };
66
67 match sched.next_unused_slot(&today_posts) {
68 Some((wait, slot)) => {
69 let jittered_wait = apply_slot_jitter(wait);
70 tracing::info!(
71 slot = %slot.format(),
72 wait_secs = jittered_wait.as_secs(),
73 "Slot mode: sleeping until next posting slot"
74 );
75
76 tokio::select! {
77 _ = cancel.cancelled() => break,
78 _ = tokio::time::sleep(jittered_wait) => {},
79 }
80
81 if cancel.is_cancelled() {
82 break;
83 }
84
85 let result = self
87 .run_slot_iteration(&mut recent_topics, max_recent, &mut rng)
88 .await;
89 self.log_content_result(&result);
90 }
91 None => {
92 tracing::info!(
94 "Slot mode: all slots used today, sleeping until next active period"
95 );
96 let wait = sched.time_until_active();
97 if wait.is_zero() {
98 tokio::select! {
100 _ = cancel.cancelled() => break,
101 _ = tokio::time::sleep(std::time::Duration::from_secs(3600)) => {},
102 }
103 } else {
104 tokio::select! {
105 _ = cancel.cancelled() => break,
106 _ = tokio::time::sleep(wait) => {},
107 }
108 }
109 }
110 }
111 } else {
112 let result = self
114 .run_iteration(&mut recent_topics, max_recent, &mut rng)
115 .await;
116 self.log_content_result(&result);
117
118 tokio::select! {
119 _ = cancel.cancelled() => break,
120 _ = scheduler.tick() => {},
121 }
122 }
123 }
124
125 tracing::info!("Content loop stopped");
126 }
127
128 pub(super) fn log_content_result(&self, result: &ContentResult) {
130 match result {
131 ContentResult::Posted { topic, content } => {
132 tracing::info!(
133 topic = %topic,
134 chars = content.len(),
135 dry_run = self.dry_run,
136 "Content iteration: tweet posted"
137 );
138 }
139 ContentResult::TooSoon {
140 elapsed_secs,
141 window_secs,
142 } => {
143 tracing::debug!(
144 elapsed = elapsed_secs,
145 window = window_secs,
146 "Content iteration: too soon since last tweet"
147 );
148 }
149 ContentResult::RateLimited => {
150 tracing::info!("Content iteration: daily tweet limit reached");
151 }
152 ContentResult::NoTopics => {
153 tracing::warn!("Content iteration: no topics available");
154 }
155 ContentResult::Failed { error } => {
156 tracing::warn!(error = %error, "Content iteration: failed");
157 }
158 }
159 }
160
161 pub(super) async fn run_slot_iteration(
163 &self,
164 recent_topics: &mut Vec<String>,
165 max_recent: usize,
166 rng: &mut impl rand::Rng,
167 ) -> ContentResult {
168 if let Some(result) = self.try_post_scheduled().await {
170 return result;
171 }
172
173 if !self.safety.can_post_tweet().await {
175 return ContentResult::RateLimited;
176 }
177
178 let topic = self.pick_topic_epsilon_greedy(recent_topics, rng).await;
180
181 let result = self.generate_and_post(&topic).await;
182
183 if matches!(result, ContentResult::Posted { .. }) {
185 if recent_topics.len() >= max_recent {
186 recent_topics.remove(0);
187 }
188 recent_topics.push(topic);
189 }
190
191 result
192 }
193
194 pub async fn run_once(&self, topic: Option<&str>) -> ContentResult {
199 let chosen_topic = match topic {
200 Some(t) => t.to_string(),
201 None => {
202 if self.topics.is_empty() {
203 return ContentResult::NoTopics;
204 }
205 let mut rng = rand::rng();
206 self.topics
207 .choose(&mut rng)
208 .expect("topics is non-empty")
209 .clone()
210 }
211 };
212
213 if !self.safety.can_post_tweet().await {
215 return ContentResult::RateLimited;
216 }
217
218 self.generate_and_post(&chosen_topic).await
219 }
220
221 pub(super) async fn run_iteration(
223 &self,
224 recent_topics: &mut Vec<String>,
225 max_recent: usize,
226 rng: &mut impl rand::Rng,
227 ) -> ContentResult {
228 if let Some(result) = self.try_post_scheduled().await {
230 return result;
231 }
232
233 match self.storage.last_tweet_time().await {
235 Ok(Some(last_time)) => {
236 let elapsed = chrono::Utc::now()
237 .signed_duration_since(last_time)
238 .num_seconds()
239 .max(0) as u64;
240
241 if elapsed < self.post_window_secs {
242 return ContentResult::TooSoon {
243 elapsed_secs: elapsed,
244 window_secs: self.post_window_secs,
245 };
246 }
247 }
248 Ok(None) => {
249 }
251 Err(e) => {
252 tracing::warn!(error = %e, "Failed to query last tweet time, proceeding anyway");
253 }
254 }
255
256 if !self.safety.can_post_tweet().await {
258 return ContentResult::RateLimited;
259 }
260
261 let topic = self.pick_topic_epsilon_greedy(recent_topics, rng).await;
263
264 let result = self.generate_and_post(&topic).await;
265
266 if matches!(result, ContentResult::Posted { .. }) {
268 if recent_topics.len() >= max_recent {
269 recent_topics.remove(0);
270 }
271 recent_topics.push(topic);
272 }
273
274 result
275 }
276}
277
278#[cfg(test)]
283mod tests {
284 use super::super::test_mocks::{make_topics, MockGenerator, MockSafety, MockStorage};
285 use super::super::{ContentLoop, ContentResult};
286 use std::sync::Arc;
287
288 #[tokio::test]
289 async fn run_once_posts_tweet() {
290 let storage = Arc::new(MockStorage::new(None));
291 let content = ContentLoop::new(
292 Arc::new(MockGenerator {
293 response: "Great tweet about Rust!".to_string(),
294 }),
295 Arc::new(MockSafety {
296 can_tweet: true,
297 can_thread: true,
298 }),
299 storage.clone(),
300 make_topics(),
301 14400,
302 false,
303 );
304
305 let result = content.run_once(Some("Rust")).await;
306 assert!(matches!(result, ContentResult::Posted { .. }));
307 assert_eq!(storage.posted_count(), 1);
308 }
309
310 #[tokio::test]
311 async fn run_once_dry_run_does_not_post() {
312 let storage = Arc::new(MockStorage::new(None));
313 let content = ContentLoop::new(
314 Arc::new(MockGenerator {
315 response: "Great tweet about Rust!".to_string(),
316 }),
317 Arc::new(MockSafety {
318 can_tweet: true,
319 can_thread: true,
320 }),
321 storage.clone(),
322 make_topics(),
323 14400,
324 true,
325 );
326
327 let result = content.run_once(Some("Rust")).await;
328 assert!(matches!(result, ContentResult::Posted { .. }));
329 assert_eq!(storage.posted_count(), 0); assert_eq!(storage.action_count(), 1); }
332
333 #[tokio::test]
334 async fn run_once_rate_limited() {
335 let content = ContentLoop::new(
336 Arc::new(MockGenerator {
337 response: "tweet".to_string(),
338 }),
339 Arc::new(MockSafety {
340 can_tweet: false,
341 can_thread: true,
342 }),
343 Arc::new(MockStorage::new(None)),
344 make_topics(),
345 14400,
346 false,
347 );
348
349 let result = content.run_once(None).await;
350 assert!(matches!(result, ContentResult::RateLimited));
351 }
352
353 #[tokio::test]
354 async fn run_once_no_topics_returns_no_topics() {
355 let content = ContentLoop::new(
356 Arc::new(MockGenerator {
357 response: "tweet".to_string(),
358 }),
359 Arc::new(MockSafety {
360 can_tweet: true,
361 can_thread: true,
362 }),
363 Arc::new(MockStorage::new(None)),
364 Vec::new(),
365 14400,
366 false,
367 );
368
369 let result = content.run_once(None).await;
370 assert!(matches!(result, ContentResult::NoTopics));
371 }
372
373 #[tokio::test]
374 async fn run_iteration_skips_when_too_soon() {
375 let now = chrono::Utc::now();
376 let last_tweet = now - chrono::Duration::hours(1);
378 let storage = Arc::new(MockStorage::new(Some(last_tweet)));
379
380 let content = ContentLoop::new(
381 Arc::new(MockGenerator {
382 response: "tweet".to_string(),
383 }),
384 Arc::new(MockSafety {
385 can_tweet: true,
386 can_thread: true,
387 }),
388 storage,
389 make_topics(),
390 14400, false,
392 );
393
394 let mut recent = Vec::new();
395 let mut rng = rand::rng();
396 let result = content.run_iteration(&mut recent, 3, &mut rng).await;
397 assert!(matches!(result, ContentResult::TooSoon { .. }));
398 }
399
400 #[tokio::test]
401 async fn run_iteration_posts_when_window_elapsed() {
402 let now = chrono::Utc::now();
403 let last_tweet = now - chrono::Duration::hours(5);
405 let storage = Arc::new(MockStorage::new(Some(last_tweet)));
406
407 let content = ContentLoop::new(
408 Arc::new(MockGenerator {
409 response: "Fresh tweet!".to_string(),
410 }),
411 Arc::new(MockSafety {
412 can_tweet: true,
413 can_thread: true,
414 }),
415 storage.clone(),
416 make_topics(),
417 14400,
418 false,
419 );
420
421 let mut recent = Vec::new();
422 let mut rng = rand::rng();
423 let result = content.run_iteration(&mut recent, 3, &mut rng).await;
424 assert!(matches!(result, ContentResult::Posted { .. }));
425 assert_eq!(storage.posted_count(), 1);
426 assert_eq!(recent.len(), 1);
427 }
428
429 #[tokio::test]
434 async fn run_cancels_immediately_with_topics() {
435 use crate::automation::scheduler::LoopScheduler;
438 use std::time::Duration;
439 use tokio_util::sync::CancellationToken;
440
441 let cancel = CancellationToken::new();
442 cancel.cancel(); let content = ContentLoop::new(
445 Arc::new(MockGenerator {
446 response: "tweet".to_string(),
447 }),
448 Arc::new(MockSafety {
449 can_tweet: true,
450 can_thread: true,
451 }),
452 Arc::new(MockStorage::new(None)),
453 make_topics(),
454 3600,
455 false,
456 );
457
458 let scheduler =
459 LoopScheduler::new(Duration::from_secs(3600), Duration::ZERO, Duration::ZERO);
460 content.run(cancel, scheduler, None).await;
462 }
463
464 #[tokio::test]
465 async fn run_no_topics_exits_on_cancel() {
466 use crate::automation::scheduler::LoopScheduler;
469 use std::time::Duration;
470 use tokio_util::sync::CancellationToken;
471
472 let cancel = CancellationToken::new();
473 cancel.cancel();
474
475 let content = ContentLoop::new(
476 Arc::new(MockGenerator {
477 response: "tweet".to_string(),
478 }),
479 Arc::new(MockSafety {
480 can_tweet: true,
481 can_thread: true,
482 }),
483 Arc::new(MockStorage::new(None)),
484 vec![], 3600,
486 false,
487 );
488
489 let scheduler = LoopScheduler::new(Duration::from_secs(1), Duration::ZERO, Duration::ZERO);
490 content.run(cancel, scheduler, None).await;
491 }
492
493 #[tokio::test]
494 async fn run_interval_mode_one_iteration_then_cancel() {
495 use crate::automation::scheduler::LoopScheduler;
498 use std::time::Duration;
499 use tokio_util::sync::CancellationToken;
500
501 let cancel = CancellationToken::new();
502 let cancel_clone = cancel.clone();
503
504 let content = ContentLoop::new(
505 Arc::new(MockGenerator {
506 response: "interval tweet".to_string(),
507 }),
508 Arc::new(MockSafety {
509 can_tweet: true,
510 can_thread: true,
511 }),
512 Arc::new(MockStorage::new(None)),
513 make_topics(),
514 0, false,
516 );
517
518 let scheduler =
520 LoopScheduler::new(Duration::from_millis(1), Duration::ZERO, Duration::ZERO);
521
522 tokio::spawn(async move {
524 tokio::time::sleep(Duration::from_millis(50)).await;
525 cancel_clone.cancel();
526 });
527
528 tokio::time::timeout(Duration::from_secs(5), content.run(cancel, scheduler, None))
529 .await
530 .expect("run() should complete within timeout");
531 }
532
533 #[tokio::test]
538 async fn log_content_result_all_variants() {
539 let content = ContentLoop::new(
540 Arc::new(MockGenerator {
541 response: "t".to_string(),
542 }),
543 Arc::new(MockSafety {
544 can_tweet: true,
545 can_thread: true,
546 }),
547 Arc::new(MockStorage::new(None)),
548 make_topics(),
549 3600,
550 false,
551 );
552
553 content.log_content_result(&ContentResult::Posted {
555 topic: "Rust".to_string(),
556 content: "hello".to_string(),
557 });
558 content.log_content_result(&ContentResult::TooSoon {
559 elapsed_secs: 10,
560 window_secs: 3600,
561 });
562 content.log_content_result(&ContentResult::RateLimited);
563 content.log_content_result(&ContentResult::NoTopics);
564 content.log_content_result(&ContentResult::Failed {
565 error: "oops".to_string(),
566 });
567 }
568
569 #[tokio::test]
574 async fn run_once_with_specific_topic() {
575 let storage = Arc::new(MockStorage::new(None));
576 let content = ContentLoop::new(
577 Arc::new(MockGenerator {
578 response: "Topic-specific tweet".to_string(),
579 }),
580 Arc::new(MockSafety {
581 can_tweet: true,
582 can_thread: true,
583 }),
584 storage.clone(),
585 make_topics(),
586 14400,
587 false,
588 );
589
590 let result = content.run_once(Some("CLI tools")).await;
591 assert!(matches!(result, ContentResult::Posted { .. }));
592 if let ContentResult::Posted { topic, .. } = result {
593 assert_eq!(topic, "CLI tools");
594 }
595 }
596
597 #[tokio::test]
598 async fn run_once_random_topic_when_none() {
599 let storage = Arc::new(MockStorage::new(None));
600 let content = ContentLoop::new(
601 Arc::new(MockGenerator {
602 response: "Random topic tweet".to_string(),
603 }),
604 Arc::new(MockSafety {
605 can_tweet: true,
606 can_thread: true,
607 }),
608 storage.clone(),
609 make_topics(),
610 14400,
611 false,
612 );
613
614 let result = content.run_once(None).await;
615 assert!(matches!(result, ContentResult::Posted { .. }));
616 }
617
618 #[tokio::test]
619 async fn run_iteration_posts_when_no_previous_tweet() {
620 let storage = Arc::new(MockStorage::new(None)); let content = ContentLoop::new(
623 Arc::new(MockGenerator {
624 response: "First ever tweet!".to_string(),
625 }),
626 Arc::new(MockSafety {
627 can_tweet: true,
628 can_thread: true,
629 }),
630 storage.clone(),
631 make_topics(),
632 14400,
633 false,
634 );
635
636 let mut recent = Vec::new();
637 let mut rng = rand::rng();
638 let result = content.run_iteration(&mut recent, 3, &mut rng).await;
639 assert!(matches!(result, ContentResult::Posted { .. }));
640 assert_eq!(storage.posted_count(), 1);
641 }
642
643 #[tokio::test]
644 async fn run_iteration_rate_limited() {
645 let now = chrono::Utc::now();
646 let last_tweet = now - chrono::Duration::hours(5);
647 let storage = Arc::new(MockStorage::new(Some(last_tweet)));
648
649 let content = ContentLoop::new(
650 Arc::new(MockGenerator {
651 response: "tweet".to_string(),
652 }),
653 Arc::new(MockSafety {
654 can_tweet: false, can_thread: true,
656 }),
657 storage,
658 make_topics(),
659 14400,
660 false,
661 );
662
663 let mut recent = Vec::new();
664 let mut rng = rand::rng();
665 let result = content.run_iteration(&mut recent, 3, &mut rng).await;
666 assert!(matches!(result, ContentResult::RateLimited));
667 }
668
669 #[tokio::test]
670 async fn run_slot_iteration_rate_limited() {
671 let storage = Arc::new(MockStorage::new(None));
672
673 let content = ContentLoop::new(
674 Arc::new(MockGenerator {
675 response: "tweet".to_string(),
676 }),
677 Arc::new(MockSafety {
678 can_tweet: false, can_thread: true,
680 }),
681 storage,
682 make_topics(),
683 14400,
684 false,
685 );
686
687 let mut recent = Vec::new();
688 let mut rng = rand::rng();
689 let result = content.run_slot_iteration(&mut recent, 3, &mut rng).await;
690 assert!(matches!(result, ContentResult::RateLimited));
691 }
692
693 #[tokio::test]
694 async fn run_slot_iteration_success_updates_recent() {
695 let storage = Arc::new(MockStorage::new(None));
696
697 let content = ContentLoop::new(
698 Arc::new(MockGenerator {
699 response: "Slot tweet!".to_string(),
700 }),
701 Arc::new(MockSafety {
702 can_tweet: true,
703 can_thread: true,
704 }),
705 storage.clone(),
706 make_topics(),
707 14400,
708 false,
709 );
710
711 let mut recent = Vec::new();
712 let mut rng = rand::rng();
713 let result = content.run_slot_iteration(&mut recent, 3, &mut rng).await;
714 assert!(matches!(result, ContentResult::Posted { .. }));
715 assert_eq!(recent.len(), 1);
716 }
717
718 #[tokio::test]
719 async fn run_slot_iteration_caps_recent_topics() {
720 let storage = Arc::new(MockStorage::new(None));
721
722 let content = ContentLoop::new(
723 Arc::new(MockGenerator {
724 response: "tweet".to_string(),
725 }),
726 Arc::new(MockSafety {
727 can_tweet: true,
728 can_thread: true,
729 }),
730 storage,
731 make_topics(),
732 14400,
733 false,
734 );
735
736 let mut recent = vec!["A".to_string(), "B".to_string(), "C".to_string()];
737 let max_recent = 3;
738 let mut rng = rand::rng();
739 let result = content
740 .run_slot_iteration(&mut recent, max_recent, &mut rng)
741 .await;
742 if matches!(result, ContentResult::Posted { .. }) {
743 assert_eq!(recent.len(), max_recent);
745 }
746 }
747
748 #[tokio::test]
749 async fn run_iteration_updates_recent_on_success() {
750 let now = chrono::Utc::now();
751 let last_tweet = now - chrono::Duration::hours(5);
752 let storage = Arc::new(MockStorage::new(Some(last_tweet)));
753
754 let content = ContentLoop::new(
755 Arc::new(MockGenerator {
756 response: "tweet".to_string(),
757 }),
758 Arc::new(MockSafety {
759 can_tweet: true,
760 can_thread: true,
761 }),
762 storage,
763 make_topics(),
764 14400,
765 false,
766 );
767
768 let mut recent = Vec::new();
769 let mut rng = rand::rng();
770 let result = content.run_iteration(&mut recent, 3, &mut rng).await;
771 assert!(matches!(result, ContentResult::Posted { .. }));
772 assert_eq!(recent.len(), 1);
773 }
774}