1use rand::Rng;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::CancellationToken;
13
14use super::circuit_breaker::CircuitBreaker;
15
16pub const QUEUE_CAPACITY: usize = 100;
18
19pub enum PostAction {
24 Reply {
26 tweet_id: String,
28 content: String,
30 media_ids: Vec<String>,
32 result_tx: Option<oneshot::Sender<Result<String, String>>>,
34 },
35 Tweet {
37 content: String,
39 media_ids: Vec<String>,
41 result_tx: Option<oneshot::Sender<Result<String, String>>>,
43 },
44 ThreadTweet {
46 content: String,
48 in_reply_to: String,
50 media_ids: Vec<String>,
52 result_tx: Option<oneshot::Sender<Result<String, String>>>,
54 },
55}
56
57impl std::fmt::Debug for PostAction {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 PostAction::Reply {
61 tweet_id,
62 content,
63 media_ids,
64 ..
65 } => f
66 .debug_struct("Reply")
67 .field("tweet_id", tweet_id)
68 .field("content_len", &content.len())
69 .field("media_count", &media_ids.len())
70 .finish(),
71 PostAction::Tweet {
72 content, media_ids, ..
73 } => f
74 .debug_struct("Tweet")
75 .field("content_len", &content.len())
76 .field("media_count", &media_ids.len())
77 .finish(),
78 PostAction::ThreadTweet {
79 content,
80 in_reply_to,
81 media_ids,
82 ..
83 } => f
84 .debug_struct("ThreadTweet")
85 .field("in_reply_to", in_reply_to)
86 .field("content_len", &content.len())
87 .field("media_count", &media_ids.len())
88 .finish(),
89 }
90 }
91}
92
93#[async_trait::async_trait]
98pub trait PostExecutor: Send + Sync {
99 async fn execute_reply(
101 &self,
102 tweet_id: &str,
103 content: &str,
104 media_ids: &[String],
105 ) -> Result<String, String>;
106
107 async fn execute_tweet(&self, content: &str, media_ids: &[String]) -> Result<String, String>;
109}
110
111pub fn create_posting_queue() -> (mpsc::Sender<PostAction>, mpsc::Receiver<PostAction>) {
116 mpsc::channel(QUEUE_CAPACITY)
117}
118
119#[async_trait::async_trait]
121pub trait ApprovalQueue: Send + Sync {
122 async fn queue_reply(
124 &self,
125 tweet_id: &str,
126 content: &str,
127 media_paths: &[String],
128 ) -> Result<i64, String>;
129
130 async fn queue_tweet(&self, content: &str, media_paths: &[String]) -> Result<i64, String>;
132}
133
134pub async fn run_posting_queue(
142 receiver: mpsc::Receiver<PostAction>,
143 executor: Arc<dyn PostExecutor>,
144 min_delay: Duration,
145 cancel: CancellationToken,
146) {
147 run_posting_queue_with_approval(receiver, executor, None, min_delay, min_delay, None, cancel)
148 .await;
149}
150
151pub async fn run_posting_queue_with_approval(
157 mut receiver: mpsc::Receiver<PostAction>,
158 executor: Arc<dyn PostExecutor>,
159 approval_queue: Option<Arc<dyn ApprovalQueue>>,
160 min_delay: Duration,
161 max_delay: Duration,
162 circuit_breaker: Option<Arc<CircuitBreaker>>,
163 cancel: CancellationToken,
164) {
165 tracing::info!("Posting queue consumer started");
166
167 loop {
168 let action = tokio::select! {
169 biased;
170 _ = cancel.cancelled() => {
171 tracing::info!("Posting queue received cancellation, draining remaining actions");
172 break;
173 }
174 action = receiver.recv() => {
175 match action {
176 Some(a) => a,
177 None => {
178 tracing::info!("Posting queue channel closed");
179 break;
180 }
181 }
182 }
183 };
184
185 if approval_queue.is_none() {
187 if let Some(ref cb) = circuit_breaker {
188 if !cb.should_allow_mutation().await {
189 tracing::warn!("Circuit breaker open — waiting before posting");
190 if !cb.wait_until_closed(&cancel).await {
191 tracing::info!("Cancelled while waiting for circuit breaker");
192 break;
193 }
194 }
195 }
196 }
197
198 let result = execute_or_queue(action, &executor, &approval_queue).await;
199
200 if approval_queue.is_none() {
202 if let Some(ref cb) = circuit_breaker {
203 match result {
204 PostResult::Success => {
205 cb.record_success().await;
206 }
207 PostResult::Error(ref msg) if is_rate_limit_error(msg) => {
208 cb.record_error().await;
209 }
210 _ => {}
211 }
212 }
213 }
214
215 let delay = randomized_delay(min_delay, max_delay);
216 if !delay.is_zero() {
217 tokio::time::sleep(delay).await;
218 }
219 }
220
221 let mut drained = 0u32;
223 while let Ok(action) = receiver.try_recv() {
224 execute_or_queue(action, &executor, &approval_queue).await;
225 drained += 1;
226 }
227
228 if drained > 0 {
229 tracing::info!(
230 count = drained,
231 "Drained remaining actions from posting queue"
232 );
233 }
234
235 tracing::info!("Posting queue consumer stopped");
236}
237
238fn is_rate_limit_error(msg: &str) -> bool {
240 let lower = msg.to_lowercase();
241 lower.contains("rate limit")
242 || lower.contains("too many requests")
243 || lower.contains("429")
244 || lower.contains("forbidden")
245 || lower.contains("403")
246}
247
248enum PostResult {
250 Success,
251 Error(String),
252 Queued,
253}
254
255async fn execute_or_queue(
257 action: PostAction,
258 executor: &Arc<dyn PostExecutor>,
259 approval_queue: &Option<Arc<dyn ApprovalQueue>>,
260) -> PostResult {
261 if let Some(queue) = approval_queue {
262 queue_for_approval(action, queue).await;
263 PostResult::Queued
264 } else {
265 execute_and_respond(action, executor).await
266 }
267}
268
269async fn queue_for_approval(action: PostAction, queue: &Arc<dyn ApprovalQueue>) {
271 let (result, result_tx) = match action {
272 PostAction::Reply {
273 tweet_id,
274 content,
275 media_ids: _,
276 result_tx,
277 } => {
278 tracing::info!(tweet_id = %tweet_id, "Queuing reply for approval");
279 let r = queue
280 .queue_reply(&tweet_id, &content, &[])
281 .await
282 .map(|id| format!("queued:{id}"));
283 (r, result_tx)
284 }
285 PostAction::Tweet {
286 content,
287 media_ids: _,
288 result_tx,
289 } => {
290 tracing::info!("Queuing tweet for approval");
291 let r = queue
292 .queue_tweet(&content, &[])
293 .await
294 .map(|id| format!("queued:{id}"));
295 (r, result_tx)
296 }
297 PostAction::ThreadTweet {
298 content,
299 in_reply_to,
300 media_ids: _,
301 result_tx,
302 } => {
303 tracing::info!(in_reply_to = %in_reply_to, "Queuing thread tweet for approval");
304 let r = queue
305 .queue_reply(&in_reply_to, &content, &[])
306 .await
307 .map(|id| format!("queued:{id}"));
308 (r, result_tx)
309 }
310 };
311
312 match &result {
313 Ok(id) => tracing::info!(queue_id = %id, "Action queued for approval"),
314 Err(e) => tracing::warn!(error = %e, "Failed to queue action for approval"),
315 }
316
317 if let Some(tx) = result_tx {
318 let _ = tx.send(result);
319 }
320}
321
322async fn execute_and_respond(action: PostAction, executor: &Arc<dyn PostExecutor>) -> PostResult {
324 let (result, result_tx) = match action {
325 PostAction::Reply {
326 tweet_id,
327 content,
328 media_ids,
329 result_tx,
330 } => {
331 tracing::debug!(tweet_id = %tweet_id, "Executing reply action");
332 let r = executor
333 .execute_reply(&tweet_id, &content, &media_ids)
334 .await;
335 (r, result_tx)
336 }
337 PostAction::Tweet {
338 content,
339 media_ids,
340 result_tx,
341 } => {
342 tracing::debug!("Executing tweet action");
343 let r = executor.execute_tweet(&content, &media_ids).await;
344 (r, result_tx)
345 }
346 PostAction::ThreadTweet {
347 content,
348 in_reply_to,
349 media_ids,
350 result_tx,
351 } => {
352 tracing::debug!(in_reply_to = %in_reply_to, "Executing thread tweet action");
353 let r = executor
354 .execute_reply(&in_reply_to, &content, &media_ids)
355 .await;
356 (r, result_tx)
357 }
358 };
359
360 let post_result = match &result {
361 Ok(id) => {
362 tracing::info!(tweet_id = %id, "Post action succeeded");
363 PostResult::Success
364 }
365 Err(e) => {
366 tracing::warn!(error = %e, "Post action failed");
367 PostResult::Error(e.clone())
368 }
369 };
370
371 if let Some(tx) = result_tx {
372 let _ = tx.send(result);
374 }
375
376 post_result
377}
378
379fn randomized_delay(min: Duration, max: Duration) -> Duration {
381 if min >= max || min.is_zero() && max.is_zero() {
382 return min;
383 }
384 let min_ms = min.as_millis() as u64;
385 let max_ms = max.as_millis() as u64;
386 Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use std::sync::Mutex;
393
394 struct MockExecutor {
396 calls: Mutex<Vec<(String, String)>>,
397 fail: bool,
398 }
399
400 impl MockExecutor {
401 fn new() -> Self {
402 Self {
403 calls: Mutex::new(Vec::new()),
404 fail: false,
405 }
406 }
407
408 fn failing() -> Self {
409 Self {
410 calls: Mutex::new(Vec::new()),
411 fail: true,
412 }
413 }
414
415 fn call_count(&self) -> usize {
416 self.calls.lock().expect("lock poisoned").len()
417 }
418
419 fn calls(&self) -> Vec<(String, String)> {
420 self.calls.lock().expect("lock poisoned").clone()
421 }
422 }
423
424 #[async_trait::async_trait]
425 impl PostExecutor for MockExecutor {
426 async fn execute_reply(
427 &self,
428 tweet_id: &str,
429 content: &str,
430 _media_ids: &[String],
431 ) -> Result<String, String> {
432 self.calls
433 .lock()
434 .expect("lock poisoned")
435 .push(("reply".to_string(), format!("{tweet_id}:{content}")));
436 if self.fail {
437 Err("mock error".to_string())
438 } else {
439 Ok("reply-id-123".to_string())
440 }
441 }
442
443 async fn execute_tweet(
444 &self,
445 content: &str,
446 _media_ids: &[String],
447 ) -> Result<String, String> {
448 self.calls
449 .lock()
450 .expect("lock poisoned")
451 .push(("tweet".to_string(), content.to_string()));
452 if self.fail {
453 Err("mock error".to_string())
454 } else {
455 Ok("tweet-id-456".to_string())
456 }
457 }
458 }
459
460 #[tokio::test]
461 async fn process_reply_action() {
462 let executor = Arc::new(MockExecutor::new());
463 let (tx, rx) = create_posting_queue();
464 let cancel = CancellationToken::new();
465
466 let cancel_clone = cancel.clone();
467 let exec_clone = executor.clone();
468 let handle = tokio::spawn(async move {
469 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
470 });
471
472 let (result_tx, result_rx) = oneshot::channel();
473 tx.send(PostAction::Reply {
474 tweet_id: "t1".to_string(),
475 content: "hello".to_string(),
476 media_ids: vec![],
477 result_tx: Some(result_tx),
478 })
479 .await
480 .expect("send failed");
481
482 let result = result_rx.await.expect("oneshot recv");
483 assert_eq!(result, Ok("reply-id-123".to_string()));
484
485 cancel.cancel();
486 handle.await.expect("join");
487 assert_eq!(executor.call_count(), 1);
488 }
489
490 #[tokio::test]
491 async fn process_tweet_action() {
492 let executor = Arc::new(MockExecutor::new());
493 let (tx, rx) = create_posting_queue();
494 let cancel = CancellationToken::new();
495
496 let cancel_clone = cancel.clone();
497 let exec_clone = executor.clone();
498 let handle = tokio::spawn(async move {
499 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
500 });
501
502 let (result_tx, result_rx) = oneshot::channel();
503 tx.send(PostAction::Tweet {
504 content: "my tweet".to_string(),
505 media_ids: vec![],
506 result_tx: Some(result_tx),
507 })
508 .await
509 .expect("send failed");
510
511 let result = result_rx.await.expect("oneshot recv");
512 assert_eq!(result, Ok("tweet-id-456".to_string()));
513
514 cancel.cancel();
515 handle.await.expect("join");
516 }
517
518 #[tokio::test]
519 async fn process_thread_tweet_action() {
520 let executor = Arc::new(MockExecutor::new());
521 let (tx, rx) = create_posting_queue();
522 let cancel = CancellationToken::new();
523
524 let cancel_clone = cancel.clone();
525 let exec_clone = executor.clone();
526 let handle = tokio::spawn(async move {
527 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
528 });
529
530 let (result_tx, result_rx) = oneshot::channel();
531 tx.send(PostAction::ThreadTweet {
532 content: "thread part 2".to_string(),
533 in_reply_to: "prev-id".to_string(),
534 media_ids: vec![],
535 result_tx: Some(result_tx),
536 })
537 .await
538 .expect("send failed");
539
540 let result = result_rx.await.expect("oneshot recv");
541 assert_eq!(result, Ok("reply-id-123".to_string()));
542
543 cancel.cancel();
544 handle.await.expect("join");
545
546 let calls = executor.calls();
547 assert_eq!(calls[0].0, "reply");
548 assert!(calls[0].1.contains("prev-id"));
549 }
550
551 #[tokio::test]
552 async fn result_tx_none_does_not_panic() {
553 let executor = Arc::new(MockExecutor::new());
554 let (tx, rx) = create_posting_queue();
555 let cancel = CancellationToken::new();
556
557 let cancel_clone = cancel.clone();
558 let exec_clone = executor.clone();
559 let handle = tokio::spawn(async move {
560 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
561 });
562
563 tx.send(PostAction::Tweet {
564 content: "fire and forget".to_string(),
565 media_ids: vec![],
566 result_tx: None,
567 })
568 .await
569 .expect("send failed");
570
571 tokio::time::sleep(Duration::from_millis(50)).await;
573
574 cancel.cancel();
575 handle.await.expect("join");
576 assert_eq!(executor.call_count(), 1);
577 }
578
579 #[tokio::test]
580 async fn failed_action_sends_error_back() {
581 let executor = Arc::new(MockExecutor::failing());
582 let (tx, rx) = create_posting_queue();
583 let cancel = CancellationToken::new();
584
585 let cancel_clone = cancel.clone();
586 let exec_clone = executor.clone();
587 let handle = tokio::spawn(async move {
588 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
589 });
590
591 let (result_tx, result_rx) = oneshot::channel();
592 tx.send(PostAction::Tweet {
593 content: "will fail".to_string(),
594 media_ids: vec![],
595 result_tx: Some(result_tx),
596 })
597 .await
598 .expect("send failed");
599
600 let result = result_rx.await.expect("oneshot recv");
601 assert!(result.is_err());
602 assert_eq!(result.unwrap_err(), "mock error");
603
604 cancel.cancel();
605 handle.await.expect("join");
606 }
607
608 #[tokio::test]
609 async fn channel_close_exits_consumer() {
610 let executor = Arc::new(MockExecutor::new());
611 let (tx, rx) = create_posting_queue();
612 let cancel = CancellationToken::new();
613
614 let handle = tokio::spawn(async move {
615 run_posting_queue(rx, executor, Duration::ZERO, cancel).await;
616 });
617
618 drop(tx);
620
621 handle.await.expect("join");
623 }
624
625 #[tokio::test]
626 async fn drain_on_cancel() {
627 let executor = Arc::new(MockExecutor::new());
628 let (tx, rx) = create_posting_queue();
629 let cancel = CancellationToken::new();
630
631 tx.send(PostAction::Tweet {
633 content: "queued1".to_string(),
634 media_ids: vec![],
635 result_tx: None,
636 })
637 .await
638 .expect("send");
639 tx.send(PostAction::Tweet {
640 content: "queued2".to_string(),
641 media_ids: vec![],
642 result_tx: None,
643 })
644 .await
645 .expect("send");
646
647 cancel.cancel();
649
650 let exec_clone = executor.clone();
651 let handle = tokio::spawn(async move {
652 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel).await;
653 });
654
655 handle.await.expect("join");
656
657 assert_eq!(executor.call_count(), 2);
659 }
660
661 #[tokio::test]
662 async fn multiple_actions_processed_in_order() {
663 let executor = Arc::new(MockExecutor::new());
664 let (tx, rx) = create_posting_queue();
665 let cancel = CancellationToken::new();
666
667 let cancel_clone = cancel.clone();
668 let exec_clone = executor.clone();
669 let handle = tokio::spawn(async move {
670 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
671 });
672
673 for i in 0..5 {
674 tx.send(PostAction::Tweet {
675 content: format!("tweet-{i}"),
676 media_ids: vec![],
677 result_tx: None,
678 })
679 .await
680 .expect("send");
681 }
682
683 tokio::time::sleep(Duration::from_millis(100)).await;
685
686 cancel.cancel();
687 handle.await.expect("join");
688
689 let calls = executor.calls();
690 assert_eq!(calls.len(), 5);
691 for (i, (action_type, content)) in calls.iter().enumerate() {
692 assert_eq!(action_type, "tweet");
693 assert_eq!(content, &format!("tweet-{i}"));
694 }
695 }
696
697 #[test]
698 fn post_action_debug_format() {
699 let action = PostAction::Reply {
700 tweet_id: "123".to_string(),
701 content: "hello world".to_string(),
702 media_ids: vec![],
703 result_tx: None,
704 };
705 let debug = format!("{action:?}");
706 assert!(debug.contains("Reply"));
707 assert!(debug.contains("123"));
708 }
709
710 struct MockApprovalQueue {
713 items: Mutex<Vec<(String, String, String)>>,
714 }
715
716 impl MockApprovalQueue {
717 fn new() -> Self {
718 Self {
719 items: Mutex::new(Vec::new()),
720 }
721 }
722
723 fn item_count(&self) -> usize {
724 self.items.lock().expect("lock").len()
725 }
726 }
727
728 #[async_trait::async_trait]
729 impl ApprovalQueue for MockApprovalQueue {
730 async fn queue_reply(
731 &self,
732 tweet_id: &str,
733 content: &str,
734 _media_paths: &[String],
735 ) -> Result<i64, String> {
736 self.items.lock().expect("lock").push((
737 "reply".to_string(),
738 tweet_id.to_string(),
739 content.to_string(),
740 ));
741 Ok(self.item_count() as i64)
742 }
743
744 async fn queue_tweet(&self, content: &str, _media_paths: &[String]) -> Result<i64, String> {
745 self.items.lock().expect("lock").push((
746 "tweet".to_string(),
747 String::new(),
748 content.to_string(),
749 ));
750 Ok(self.item_count() as i64)
751 }
752 }
753
754 #[tokio::test]
755 async fn approval_mode_queues_instead_of_posting() {
756 let executor = Arc::new(MockExecutor::new());
757 let approval = Arc::new(MockApprovalQueue::new());
758 let (tx, rx) = create_posting_queue();
759 let cancel = CancellationToken::new();
760
761 let cancel_clone = cancel.clone();
762 let exec_clone = executor.clone();
763 let approval_clone = approval.clone();
764 let handle = tokio::spawn(async move {
765 run_posting_queue_with_approval(
766 rx,
767 exec_clone,
768 Some(approval_clone),
769 Duration::ZERO,
770 Duration::ZERO,
771 None,
772 cancel_clone,
773 )
774 .await;
775 });
776
777 let (result_tx, result_rx) = oneshot::channel();
778 tx.send(PostAction::Reply {
779 tweet_id: "t1".to_string(),
780 content: "hello".to_string(),
781 media_ids: vec![],
782 result_tx: Some(result_tx),
783 })
784 .await
785 .expect("send");
786
787 let result = result_rx.await.expect("recv");
788 assert!(result.is_ok());
789 assert!(result.unwrap().starts_with("queued:"));
790
791 assert_eq!(executor.call_count(), 0);
793 assert_eq!(approval.item_count(), 1);
795
796 cancel.cancel();
797 handle.await.expect("join");
798 }
799
800 #[tokio::test]
801 async fn approval_mode_queues_tweets() {
802 let executor = Arc::new(MockExecutor::new());
803 let approval = Arc::new(MockApprovalQueue::new());
804 let (tx, rx) = create_posting_queue();
805 let cancel = CancellationToken::new();
806
807 let cancel_clone = cancel.clone();
808 let exec_clone = executor.clone();
809 let approval_clone = approval.clone();
810 let handle = tokio::spawn(async move {
811 run_posting_queue_with_approval(
812 rx,
813 exec_clone,
814 Some(approval_clone),
815 Duration::ZERO,
816 Duration::ZERO,
817 None,
818 cancel_clone,
819 )
820 .await;
821 });
822
823 tx.send(PostAction::Tweet {
824 content: "my tweet".to_string(),
825 media_ids: vec![],
826 result_tx: None,
827 })
828 .await
829 .expect("send");
830
831 tokio::time::sleep(Duration::from_millis(50)).await;
832
833 assert_eq!(executor.call_count(), 0);
834 assert_eq!(approval.item_count(), 1);
835
836 cancel.cancel();
837 handle.await.expect("join");
838 }
839}