Skip to main content

tuitbot_core/automation/
posting_queue.rs

1//! Serialized posting queue for concurrent automation loops.
2//!
3//! All loops funnel post actions through a single bounded MPSC channel,
4//! preventing race conditions and ensuring rate limits are respected
5//! globally. A single consumer task processes actions sequentially with
6//! configurable delays between posts.
7
8use rand::Rng;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::CancellationToken;
13
14/// Default bounded channel capacity for the posting queue.
15pub const QUEUE_CAPACITY: usize = 100;
16
17/// An action to be executed by the posting queue consumer.
18///
19/// Each variant optionally includes a oneshot sender so the caller can
20/// await the result (e.g., the posted tweet ID or an error message).
21pub enum PostAction {
22    /// Reply to an existing tweet.
23    Reply {
24        /// The ID of the tweet to reply to.
25        tweet_id: String,
26        /// The reply content.
27        content: String,
28        /// Optional channel to receive the result (posted tweet ID or error).
29        result_tx: Option<oneshot::Sender<Result<String, String>>>,
30    },
31    /// Post a new original tweet.
32    Tweet {
33        /// The tweet content.
34        content: String,
35        /// Optional channel to receive the result.
36        result_tx: Option<oneshot::Sender<Result<String, String>>>,
37    },
38    /// Post a tweet as part of a thread (reply to previous tweet in thread).
39    ThreadTweet {
40        /// The tweet content.
41        content: String,
42        /// The ID of the previous tweet in the thread.
43        in_reply_to: String,
44        /// Optional channel to receive the result.
45        result_tx: Option<oneshot::Sender<Result<String, String>>>,
46    },
47}
48
49impl std::fmt::Debug for PostAction {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            PostAction::Reply {
53                tweet_id, content, ..
54            } => f
55                .debug_struct("Reply")
56                .field("tweet_id", tweet_id)
57                .field("content_len", &content.len())
58                .finish(),
59            PostAction::Tweet { content, .. } => f
60                .debug_struct("Tweet")
61                .field("content_len", &content.len())
62                .finish(),
63            PostAction::ThreadTweet {
64                content,
65                in_reply_to,
66                ..
67            } => f
68                .debug_struct("ThreadTweet")
69                .field("in_reply_to", in_reply_to)
70                .field("content_len", &content.len())
71                .finish(),
72        }
73    }
74}
75
76/// Trait for executing post actions against the X API.
77///
78/// This trait decouples the posting queue from the actual API client,
79/// allowing the queue to be tested with mock executors.
80#[async_trait::async_trait]
81pub trait PostExecutor: Send + Sync {
82    /// Post a reply to a specific tweet. Returns the posted tweet ID.
83    async fn execute_reply(&self, tweet_id: &str, content: &str) -> Result<String, String>;
84
85    /// Post a new original tweet. Returns the posted tweet ID.
86    async fn execute_tweet(&self, content: &str) -> Result<String, String>;
87}
88
89/// Create a bounded posting queue channel.
90///
91/// Returns `(sender, receiver)`. Clone the sender for each automation loop.
92/// Pass the receiver to [`run_posting_queue`].
93pub fn create_posting_queue() -> (mpsc::Sender<PostAction>, mpsc::Receiver<PostAction>) {
94    mpsc::channel(QUEUE_CAPACITY)
95}
96
97/// Trait for queueing actions for human approval instead of posting.
98#[async_trait::async_trait]
99pub trait ApprovalQueue: Send + Sync {
100    /// Queue a reply for human review. Returns the queue item ID.
101    async fn queue_reply(&self, tweet_id: &str, content: &str) -> Result<i64, String>;
102
103    /// Queue a tweet for human review. Returns the queue item ID.
104    async fn queue_tweet(&self, content: &str) -> Result<i64, String>;
105}
106
107/// Run the posting queue consumer loop.
108///
109/// Processes actions sequentially with `min_delay` between each post.
110/// On cancellation, drains remaining actions in the channel before exiting.
111///
112/// When `approval_queue` is `Some`, actions are queued for human review
113/// instead of being posted directly.
114pub async fn run_posting_queue(
115    receiver: mpsc::Receiver<PostAction>,
116    executor: Arc<dyn PostExecutor>,
117    min_delay: Duration,
118    cancel: CancellationToken,
119) {
120    run_posting_queue_with_approval(receiver, executor, None, min_delay, min_delay, cancel).await;
121}
122
123/// Run the posting queue consumer loop with optional approval mode.
124///
125/// Delay between posts is randomized uniformly in `[min_delay, max_delay]`.
126pub async fn run_posting_queue_with_approval(
127    mut receiver: mpsc::Receiver<PostAction>,
128    executor: Arc<dyn PostExecutor>,
129    approval_queue: Option<Arc<dyn ApprovalQueue>>,
130    min_delay: Duration,
131    max_delay: Duration,
132    cancel: CancellationToken,
133) {
134    tracing::info!("Posting queue consumer started");
135
136    loop {
137        let action = tokio::select! {
138            biased;
139            _ = cancel.cancelled() => {
140                tracing::info!("Posting queue received cancellation, draining remaining actions");
141                break;
142            }
143            action = receiver.recv() => {
144                match action {
145                    Some(a) => a,
146                    None => {
147                        tracing::info!("Posting queue channel closed");
148                        break;
149                    }
150                }
151            }
152        };
153
154        execute_or_queue(action, &executor, &approval_queue).await;
155
156        let delay = randomized_delay(min_delay, max_delay);
157        if !delay.is_zero() {
158            tokio::time::sleep(delay).await;
159        }
160    }
161
162    // Drain remaining actions after cancellation or channel close.
163    let mut drained = 0u32;
164    while let Ok(action) = receiver.try_recv() {
165        execute_or_queue(action, &executor, &approval_queue).await;
166        drained += 1;
167    }
168
169    if drained > 0 {
170        tracing::info!(
171            count = drained,
172            "Drained remaining actions from posting queue"
173        );
174    }
175
176    tracing::info!("Posting queue consumer stopped");
177}
178
179/// Route a post action: queue for approval if approval mode is on, otherwise execute.
180async fn execute_or_queue(
181    action: PostAction,
182    executor: &Arc<dyn PostExecutor>,
183    approval_queue: &Option<Arc<dyn ApprovalQueue>>,
184) {
185    if let Some(queue) = approval_queue {
186        queue_for_approval(action, queue).await;
187    } else {
188        execute_and_respond(action, executor).await;
189    }
190}
191
192/// Queue a post action for human approval instead of posting.
193async fn queue_for_approval(action: PostAction, queue: &Arc<dyn ApprovalQueue>) {
194    let (result, result_tx) = match action {
195        PostAction::Reply {
196            tweet_id,
197            content,
198            result_tx,
199        } => {
200            tracing::info!(tweet_id = %tweet_id, "Queuing reply for approval");
201            let r = queue
202                .queue_reply(&tweet_id, &content)
203                .await
204                .map(|id| format!("queued:{id}"));
205            (r, result_tx)
206        }
207        PostAction::Tweet {
208            content, result_tx, ..
209        } => {
210            tracing::info!("Queuing tweet for approval");
211            let r = queue
212                .queue_tweet(&content)
213                .await
214                .map(|id| format!("queued:{id}"));
215            (r, result_tx)
216        }
217        PostAction::ThreadTweet {
218            content,
219            in_reply_to,
220            result_tx,
221        } => {
222            tracing::info!(in_reply_to = %in_reply_to, "Queuing thread tweet for approval");
223            let r = queue
224                .queue_reply(&in_reply_to, &content)
225                .await
226                .map(|id| format!("queued:{id}"));
227            (r, result_tx)
228        }
229    };
230
231    match &result {
232        Ok(id) => tracing::info!(queue_id = %id, "Action queued for approval"),
233        Err(e) => tracing::warn!(error = %e, "Failed to queue action for approval"),
234    }
235
236    if let Some(tx) = result_tx {
237        let _ = tx.send(result);
238    }
239}
240
241/// Execute a single post action and send the result back via oneshot.
242async fn execute_and_respond(action: PostAction, executor: &Arc<dyn PostExecutor>) {
243    let (result, result_tx) = match action {
244        PostAction::Reply {
245            tweet_id,
246            content,
247            result_tx,
248        } => {
249            tracing::debug!(tweet_id = %tweet_id, "Executing reply action");
250            let r = executor.execute_reply(&tweet_id, &content).await;
251            (r, result_tx)
252        }
253        PostAction::Tweet {
254            content, result_tx, ..
255        } => {
256            tracing::debug!("Executing tweet action");
257            let r = executor.execute_tweet(&content).await;
258            (r, result_tx)
259        }
260        PostAction::ThreadTweet {
261            content,
262            in_reply_to,
263            result_tx,
264        } => {
265            tracing::debug!(in_reply_to = %in_reply_to, "Executing thread tweet action");
266            let r = executor.execute_reply(&in_reply_to, &content).await;
267            (r, result_tx)
268        }
269    };
270
271    match &result {
272        Ok(id) => tracing::info!(tweet_id = %id, "Post action succeeded"),
273        Err(e) => tracing::warn!(error = %e, "Post action failed"),
274    }
275
276    if let Some(tx) = result_tx {
277        // Ignore send error (receiver may have been dropped).
278        let _ = tx.send(result);
279    }
280}
281
282/// Compute a randomized delay between `min` and `max`.
283fn randomized_delay(min: Duration, max: Duration) -> Duration {
284    if min >= max || min.is_zero() && max.is_zero() {
285        return min;
286    }
287    let min_ms = min.as_millis() as u64;
288    let max_ms = max.as_millis() as u64;
289    Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use std::sync::Mutex;
296
297    /// Mock executor that records all calls.
298    struct MockExecutor {
299        calls: Mutex<Vec<(String, String)>>,
300        fail: bool,
301    }
302
303    impl MockExecutor {
304        fn new() -> Self {
305            Self {
306                calls: Mutex::new(Vec::new()),
307                fail: false,
308            }
309        }
310
311        fn failing() -> Self {
312            Self {
313                calls: Mutex::new(Vec::new()),
314                fail: true,
315            }
316        }
317
318        fn call_count(&self) -> usize {
319            self.calls.lock().expect("lock poisoned").len()
320        }
321
322        fn calls(&self) -> Vec<(String, String)> {
323            self.calls.lock().expect("lock poisoned").clone()
324        }
325    }
326
327    #[async_trait::async_trait]
328    impl PostExecutor for MockExecutor {
329        async fn execute_reply(&self, tweet_id: &str, content: &str) -> Result<String, String> {
330            self.calls
331                .lock()
332                .expect("lock poisoned")
333                .push(("reply".to_string(), format!("{tweet_id}:{content}")));
334            if self.fail {
335                Err("mock error".to_string())
336            } else {
337                Ok("reply-id-123".to_string())
338            }
339        }
340
341        async fn execute_tweet(&self, content: &str) -> Result<String, String> {
342            self.calls
343                .lock()
344                .expect("lock poisoned")
345                .push(("tweet".to_string(), content.to_string()));
346            if self.fail {
347                Err("mock error".to_string())
348            } else {
349                Ok("tweet-id-456".to_string())
350            }
351        }
352    }
353
354    #[tokio::test]
355    async fn process_reply_action() {
356        let executor = Arc::new(MockExecutor::new());
357        let (tx, rx) = create_posting_queue();
358        let cancel = CancellationToken::new();
359
360        let cancel_clone = cancel.clone();
361        let exec_clone = executor.clone();
362        let handle = tokio::spawn(async move {
363            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
364        });
365
366        let (result_tx, result_rx) = oneshot::channel();
367        tx.send(PostAction::Reply {
368            tweet_id: "t1".to_string(),
369            content: "hello".to_string(),
370            result_tx: Some(result_tx),
371        })
372        .await
373        .expect("send failed");
374
375        let result = result_rx.await.expect("oneshot recv");
376        assert_eq!(result, Ok("reply-id-123".to_string()));
377
378        cancel.cancel();
379        handle.await.expect("join");
380        assert_eq!(executor.call_count(), 1);
381    }
382
383    #[tokio::test]
384    async fn process_tweet_action() {
385        let executor = Arc::new(MockExecutor::new());
386        let (tx, rx) = create_posting_queue();
387        let cancel = CancellationToken::new();
388
389        let cancel_clone = cancel.clone();
390        let exec_clone = executor.clone();
391        let handle = tokio::spawn(async move {
392            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
393        });
394
395        let (result_tx, result_rx) = oneshot::channel();
396        tx.send(PostAction::Tweet {
397            content: "my tweet".to_string(),
398            result_tx: Some(result_tx),
399        })
400        .await
401        .expect("send failed");
402
403        let result = result_rx.await.expect("oneshot recv");
404        assert_eq!(result, Ok("tweet-id-456".to_string()));
405
406        cancel.cancel();
407        handle.await.expect("join");
408    }
409
410    #[tokio::test]
411    async fn process_thread_tweet_action() {
412        let executor = Arc::new(MockExecutor::new());
413        let (tx, rx) = create_posting_queue();
414        let cancel = CancellationToken::new();
415
416        let cancel_clone = cancel.clone();
417        let exec_clone = executor.clone();
418        let handle = tokio::spawn(async move {
419            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
420        });
421
422        let (result_tx, result_rx) = oneshot::channel();
423        tx.send(PostAction::ThreadTweet {
424            content: "thread part 2".to_string(),
425            in_reply_to: "prev-id".to_string(),
426            result_tx: Some(result_tx),
427        })
428        .await
429        .expect("send failed");
430
431        let result = result_rx.await.expect("oneshot recv");
432        assert_eq!(result, Ok("reply-id-123".to_string()));
433
434        cancel.cancel();
435        handle.await.expect("join");
436
437        let calls = executor.calls();
438        assert_eq!(calls[0].0, "reply");
439        assert!(calls[0].1.contains("prev-id"));
440    }
441
442    #[tokio::test]
443    async fn result_tx_none_does_not_panic() {
444        let executor = Arc::new(MockExecutor::new());
445        let (tx, rx) = create_posting_queue();
446        let cancel = CancellationToken::new();
447
448        let cancel_clone = cancel.clone();
449        let exec_clone = executor.clone();
450        let handle = tokio::spawn(async move {
451            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
452        });
453
454        tx.send(PostAction::Tweet {
455            content: "fire and forget".to_string(),
456            result_tx: None,
457        })
458        .await
459        .expect("send failed");
460
461        // Give time for processing
462        tokio::time::sleep(Duration::from_millis(50)).await;
463
464        cancel.cancel();
465        handle.await.expect("join");
466        assert_eq!(executor.call_count(), 1);
467    }
468
469    #[tokio::test]
470    async fn failed_action_sends_error_back() {
471        let executor = Arc::new(MockExecutor::failing());
472        let (tx, rx) = create_posting_queue();
473        let cancel = CancellationToken::new();
474
475        let cancel_clone = cancel.clone();
476        let exec_clone = executor.clone();
477        let handle = tokio::spawn(async move {
478            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
479        });
480
481        let (result_tx, result_rx) = oneshot::channel();
482        tx.send(PostAction::Tweet {
483            content: "will fail".to_string(),
484            result_tx: Some(result_tx),
485        })
486        .await
487        .expect("send failed");
488
489        let result = result_rx.await.expect("oneshot recv");
490        assert!(result.is_err());
491        assert_eq!(result.unwrap_err(), "mock error");
492
493        cancel.cancel();
494        handle.await.expect("join");
495    }
496
497    #[tokio::test]
498    async fn channel_close_exits_consumer() {
499        let executor = Arc::new(MockExecutor::new());
500        let (tx, rx) = create_posting_queue();
501        let cancel = CancellationToken::new();
502
503        let handle = tokio::spawn(async move {
504            run_posting_queue(rx, executor, Duration::ZERO, cancel).await;
505        });
506
507        // Drop sender to close channel
508        drop(tx);
509
510        // Consumer should exit cleanly
511        handle.await.expect("join");
512    }
513
514    #[tokio::test]
515    async fn drain_on_cancel() {
516        let executor = Arc::new(MockExecutor::new());
517        let (tx, rx) = create_posting_queue();
518        let cancel = CancellationToken::new();
519
520        // Send actions before starting consumer
521        tx.send(PostAction::Tweet {
522            content: "queued1".to_string(),
523            result_tx: None,
524        })
525        .await
526        .expect("send");
527        tx.send(PostAction::Tweet {
528            content: "queued2".to_string(),
529            result_tx: None,
530        })
531        .await
532        .expect("send");
533
534        // Cancel immediately
535        cancel.cancel();
536
537        let exec_clone = executor.clone();
538        let handle = tokio::spawn(async move {
539            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel).await;
540        });
541
542        handle.await.expect("join");
543
544        // Both queued actions should have been drained
545        assert_eq!(executor.call_count(), 2);
546    }
547
548    #[tokio::test]
549    async fn multiple_actions_processed_in_order() {
550        let executor = Arc::new(MockExecutor::new());
551        let (tx, rx) = create_posting_queue();
552        let cancel = CancellationToken::new();
553
554        let cancel_clone = cancel.clone();
555        let exec_clone = executor.clone();
556        let handle = tokio::spawn(async move {
557            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
558        });
559
560        for i in 0..5 {
561            tx.send(PostAction::Tweet {
562                content: format!("tweet-{i}"),
563                result_tx: None,
564            })
565            .await
566            .expect("send");
567        }
568
569        // Wait for processing
570        tokio::time::sleep(Duration::from_millis(100)).await;
571
572        cancel.cancel();
573        handle.await.expect("join");
574
575        let calls = executor.calls();
576        assert_eq!(calls.len(), 5);
577        for (i, (action_type, content)) in calls.iter().enumerate() {
578            assert_eq!(action_type, "tweet");
579            assert_eq!(content, &format!("tweet-{i}"));
580        }
581    }
582
583    #[test]
584    fn post_action_debug_format() {
585        let action = PostAction::Reply {
586            tweet_id: "123".to_string(),
587            content: "hello world".to_string(),
588            result_tx: None,
589        };
590        let debug = format!("{action:?}");
591        assert!(debug.contains("Reply"));
592        assert!(debug.contains("123"));
593    }
594
595    // --- Approval queue tests ---
596
597    struct MockApprovalQueue {
598        items: Mutex<Vec<(String, String, String)>>,
599    }
600
601    impl MockApprovalQueue {
602        fn new() -> Self {
603            Self {
604                items: Mutex::new(Vec::new()),
605            }
606        }
607
608        fn item_count(&self) -> usize {
609            self.items.lock().expect("lock").len()
610        }
611    }
612
613    #[async_trait::async_trait]
614    impl ApprovalQueue for MockApprovalQueue {
615        async fn queue_reply(&self, tweet_id: &str, content: &str) -> Result<i64, String> {
616            self.items.lock().expect("lock").push((
617                "reply".to_string(),
618                tweet_id.to_string(),
619                content.to_string(),
620            ));
621            Ok(self.item_count() as i64)
622        }
623
624        async fn queue_tweet(&self, content: &str) -> Result<i64, String> {
625            self.items.lock().expect("lock").push((
626                "tweet".to_string(),
627                String::new(),
628                content.to_string(),
629            ));
630            Ok(self.item_count() as i64)
631        }
632    }
633
634    #[tokio::test]
635    async fn approval_mode_queues_instead_of_posting() {
636        let executor = Arc::new(MockExecutor::new());
637        let approval = Arc::new(MockApprovalQueue::new());
638        let (tx, rx) = create_posting_queue();
639        let cancel = CancellationToken::new();
640
641        let cancel_clone = cancel.clone();
642        let exec_clone = executor.clone();
643        let approval_clone = approval.clone();
644        let handle = tokio::spawn(async move {
645            run_posting_queue_with_approval(
646                rx,
647                exec_clone,
648                Some(approval_clone),
649                Duration::ZERO,
650                Duration::ZERO,
651                cancel_clone,
652            )
653            .await;
654        });
655
656        let (result_tx, result_rx) = oneshot::channel();
657        tx.send(PostAction::Reply {
658            tweet_id: "t1".to_string(),
659            content: "hello".to_string(),
660            result_tx: Some(result_tx),
661        })
662        .await
663        .expect("send");
664
665        let result = result_rx.await.expect("recv");
666        assert!(result.is_ok());
667        assert!(result.unwrap().starts_with("queued:"));
668
669        // Executor should NOT have been called
670        assert_eq!(executor.call_count(), 0);
671        // Approval queue should have the item
672        assert_eq!(approval.item_count(), 1);
673
674        cancel.cancel();
675        handle.await.expect("join");
676    }
677
678    #[tokio::test]
679    async fn approval_mode_queues_tweets() {
680        let executor = Arc::new(MockExecutor::new());
681        let approval = Arc::new(MockApprovalQueue::new());
682        let (tx, rx) = create_posting_queue();
683        let cancel = CancellationToken::new();
684
685        let cancel_clone = cancel.clone();
686        let exec_clone = executor.clone();
687        let approval_clone = approval.clone();
688        let handle = tokio::spawn(async move {
689            run_posting_queue_with_approval(
690                rx,
691                exec_clone,
692                Some(approval_clone),
693                Duration::ZERO,
694                Duration::ZERO,
695                cancel_clone,
696            )
697            .await;
698        });
699
700        tx.send(PostAction::Tweet {
701            content: "my tweet".to_string(),
702            result_tx: None,
703        })
704        .await
705        .expect("send");
706
707        tokio::time::sleep(Duration::from_millis(50)).await;
708
709        assert_eq!(executor.call_count(), 0);
710        assert_eq!(approval.item_count(), 1);
711
712        cancel.cancel();
713        handle.await.expect("join");
714    }
715}