Skip to main content

tuitbot_core/automation/
posting_queue.rs

1//! Serialized posting queue for concurrent automation loops.
2//!
3//! All loops funnel post actions through a single bounded MPSC channel,
4//! preventing race conditions and ensuring rate limits are respected
5//! globally. A single consumer task processes actions sequentially with
6//! configurable delays between posts.
7
8use rand::Rng;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::CancellationToken;
13
14use super::circuit_breaker::CircuitBreaker;
15
16/// Default bounded channel capacity for the posting queue.
17pub const QUEUE_CAPACITY: usize = 100;
18
19/// An action to be executed by the posting queue consumer.
20///
21/// Each variant optionally includes a oneshot sender so the caller can
22/// await the result (e.g., the posted tweet ID or an error message).
23pub enum PostAction {
24    /// Reply to an existing tweet.
25    Reply {
26        /// The ID of the tweet to reply to.
27        tweet_id: String,
28        /// The reply content.
29        content: String,
30        /// Media IDs to attach (already uploaded to X API).
31        media_ids: Vec<String>,
32        /// Optional channel to receive the result (posted tweet ID or error).
33        result_tx: Option<oneshot::Sender<Result<String, String>>>,
34    },
35    /// Post a new original tweet.
36    Tweet {
37        /// The tweet content.
38        content: String,
39        /// Media IDs to attach (already uploaded to X API).
40        media_ids: Vec<String>,
41        /// Optional channel to receive the result.
42        result_tx: Option<oneshot::Sender<Result<String, String>>>,
43    },
44    /// Post a tweet as part of a thread (reply to previous tweet in thread).
45    ThreadTweet {
46        /// The tweet content.
47        content: String,
48        /// The ID of the previous tweet in the thread.
49        in_reply_to: String,
50        /// Media IDs to attach (already uploaded to X API).
51        media_ids: Vec<String>,
52        /// Optional channel to receive the result.
53        result_tx: Option<oneshot::Sender<Result<String, String>>>,
54    },
55}
56
57impl std::fmt::Debug for PostAction {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            PostAction::Reply {
61                tweet_id,
62                content,
63                media_ids,
64                ..
65            } => f
66                .debug_struct("Reply")
67                .field("tweet_id", tweet_id)
68                .field("content_len", &content.len())
69                .field("media_count", &media_ids.len())
70                .finish(),
71            PostAction::Tweet {
72                content, media_ids, ..
73            } => f
74                .debug_struct("Tweet")
75                .field("content_len", &content.len())
76                .field("media_count", &media_ids.len())
77                .finish(),
78            PostAction::ThreadTweet {
79                content,
80                in_reply_to,
81                media_ids,
82                ..
83            } => f
84                .debug_struct("ThreadTweet")
85                .field("in_reply_to", in_reply_to)
86                .field("content_len", &content.len())
87                .field("media_count", &media_ids.len())
88                .finish(),
89        }
90    }
91}
92
93/// Trait for executing post actions against the X API.
94///
95/// This trait decouples the posting queue from the actual API client,
96/// allowing the queue to be tested with mock executors.
97#[async_trait::async_trait]
98pub trait PostExecutor: Send + Sync {
99    /// Post a reply to a specific tweet. Returns the posted tweet ID.
100    async fn execute_reply(
101        &self,
102        tweet_id: &str,
103        content: &str,
104        media_ids: &[String],
105    ) -> Result<String, String>;
106
107    /// Post a new original tweet. Returns the posted tweet ID.
108    async fn execute_tweet(&self, content: &str, media_ids: &[String]) -> Result<String, String>;
109}
110
111/// Create a bounded posting queue channel.
112///
113/// Returns `(sender, receiver)`. Clone the sender for each automation loop.
114/// Pass the receiver to [`run_posting_queue`].
115pub fn create_posting_queue() -> (mpsc::Sender<PostAction>, mpsc::Receiver<PostAction>) {
116    mpsc::channel(QUEUE_CAPACITY)
117}
118
119/// Trait for queueing actions for human approval instead of posting.
120#[async_trait::async_trait]
121pub trait ApprovalQueue: Send + Sync {
122    /// Queue a reply for human review. Returns the queue item ID.
123    async fn queue_reply(
124        &self,
125        tweet_id: &str,
126        content: &str,
127        media_paths: &[String],
128    ) -> Result<i64, String>;
129
130    /// Queue a tweet for human review. Returns the queue item ID.
131    async fn queue_tweet(&self, content: &str, media_paths: &[String]) -> Result<i64, String>;
132}
133
134/// Run the posting queue consumer loop.
135///
136/// Processes actions sequentially with `min_delay` between each post.
137/// On cancellation, drains remaining actions in the channel before exiting.
138///
139/// When `approval_queue` is `Some`, actions are queued for human review
140/// instead of being posted directly.
141pub async fn run_posting_queue(
142    receiver: mpsc::Receiver<PostAction>,
143    executor: Arc<dyn PostExecutor>,
144    min_delay: Duration,
145    cancel: CancellationToken,
146) {
147    run_posting_queue_with_approval(receiver, executor, None, min_delay, min_delay, None, cancel)
148        .await;
149}
150
151/// Run the posting queue consumer loop with optional approval mode.
152///
153/// Delay between posts is randomized uniformly in `[min_delay, max_delay]`.
154/// If a `circuit_breaker` is provided, mutations are gated: the queue blocks
155/// while the breaker is Open, and errors/successes are recorded.
156pub async fn run_posting_queue_with_approval(
157    mut receiver: mpsc::Receiver<PostAction>,
158    executor: Arc<dyn PostExecutor>,
159    approval_queue: Option<Arc<dyn ApprovalQueue>>,
160    min_delay: Duration,
161    max_delay: Duration,
162    circuit_breaker: Option<Arc<CircuitBreaker>>,
163    cancel: CancellationToken,
164) {
165    tracing::info!("Posting queue consumer started");
166
167    loop {
168        let action = tokio::select! {
169            biased;
170            _ = cancel.cancelled() => {
171                tracing::info!("Posting queue received cancellation, draining remaining actions");
172                break;
173            }
174            action = receiver.recv() => {
175                match action {
176                    Some(a) => a,
177                    None => {
178                        tracing::info!("Posting queue channel closed");
179                        break;
180                    }
181                }
182            }
183        };
184
185        // Gate on circuit breaker (only for direct execution, not approval queue).
186        if approval_queue.is_none() {
187            if let Some(ref cb) = circuit_breaker {
188                if !cb.should_allow_mutation().await {
189                    tracing::warn!("Circuit breaker open — waiting before posting");
190                    if !cb.wait_until_closed(&cancel).await {
191                        tracing::info!("Cancelled while waiting for circuit breaker");
192                        break;
193                    }
194                }
195            }
196        }
197
198        let result = execute_or_queue(action, &executor, &approval_queue).await;
199
200        // Record result in circuit breaker.
201        if approval_queue.is_none() {
202            if let Some(ref cb) = circuit_breaker {
203                match result {
204                    PostResult::Success => {
205                        cb.record_success().await;
206                    }
207                    PostResult::Error(ref msg) if is_rate_limit_error(msg) => {
208                        cb.record_error().await;
209                    }
210                    _ => {}
211                }
212            }
213        }
214
215        let delay = randomized_delay(min_delay, max_delay);
216        if !delay.is_zero() {
217            tokio::time::sleep(delay).await;
218        }
219    }
220
221    // Drain remaining actions after cancellation or channel close.
222    let mut drained = 0u32;
223    while let Ok(action) = receiver.try_recv() {
224        execute_or_queue(action, &executor, &approval_queue).await;
225        drained += 1;
226    }
227
228    if drained > 0 {
229        tracing::info!(
230            count = drained,
231            "Drained remaining actions from posting queue"
232        );
233    }
234
235    tracing::info!("Posting queue consumer stopped");
236}
237
238/// Whether an error message indicates a rate limit or forbidden response.
239fn is_rate_limit_error(msg: &str) -> bool {
240    let lower = msg.to_lowercase();
241    lower.contains("rate limit")
242        || lower.contains("too many requests")
243        || lower.contains("429")
244        || lower.contains("forbidden")
245        || lower.contains("403")
246}
247
248/// Outcome of a post action (for circuit breaker tracking).
249enum PostResult {
250    Success,
251    Error(String),
252    Queued,
253}
254
255/// Route a post action: queue for approval if approval mode is on, otherwise execute.
256async fn execute_or_queue(
257    action: PostAction,
258    executor: &Arc<dyn PostExecutor>,
259    approval_queue: &Option<Arc<dyn ApprovalQueue>>,
260) -> PostResult {
261    if let Some(queue) = approval_queue {
262        queue_for_approval(action, queue).await;
263        PostResult::Queued
264    } else {
265        execute_and_respond(action, executor).await
266    }
267}
268
269/// Queue a post action for human approval instead of posting.
270async fn queue_for_approval(action: PostAction, queue: &Arc<dyn ApprovalQueue>) {
271    let (result, result_tx) = match action {
272        PostAction::Reply {
273            tweet_id,
274            content,
275            media_ids: _,
276            result_tx,
277        } => {
278            tracing::info!(tweet_id = %tweet_id, "Queuing reply for approval");
279            let r = queue
280                .queue_reply(&tweet_id, &content, &[])
281                .await
282                .map(|id| format!("queued:{id}"));
283            (r, result_tx)
284        }
285        PostAction::Tweet {
286            content,
287            media_ids: _,
288            result_tx,
289        } => {
290            tracing::info!("Queuing tweet for approval");
291            let r = queue
292                .queue_tweet(&content, &[])
293                .await
294                .map(|id| format!("queued:{id}"));
295            (r, result_tx)
296        }
297        PostAction::ThreadTweet {
298            content,
299            in_reply_to,
300            media_ids: _,
301            result_tx,
302        } => {
303            tracing::info!(in_reply_to = %in_reply_to, "Queuing thread tweet for approval");
304            let r = queue
305                .queue_reply(&in_reply_to, &content, &[])
306                .await
307                .map(|id| format!("queued:{id}"));
308            (r, result_tx)
309        }
310    };
311
312    match &result {
313        Ok(id) => tracing::info!(queue_id = %id, "Action queued for approval"),
314        Err(e) => tracing::warn!(error = %e, "Failed to queue action for approval"),
315    }
316
317    if let Some(tx) = result_tx {
318        let _ = tx.send(result);
319    }
320}
321
322/// Execute a single post action and send the result back via oneshot.
323async fn execute_and_respond(action: PostAction, executor: &Arc<dyn PostExecutor>) -> PostResult {
324    let (result, result_tx) = match action {
325        PostAction::Reply {
326            tweet_id,
327            content,
328            media_ids,
329            result_tx,
330        } => {
331            tracing::debug!(tweet_id = %tweet_id, "Executing reply action");
332            let r = executor
333                .execute_reply(&tweet_id, &content, &media_ids)
334                .await;
335            (r, result_tx)
336        }
337        PostAction::Tweet {
338            content,
339            media_ids,
340            result_tx,
341        } => {
342            tracing::debug!("Executing tweet action");
343            let r = executor.execute_tweet(&content, &media_ids).await;
344            (r, result_tx)
345        }
346        PostAction::ThreadTweet {
347            content,
348            in_reply_to,
349            media_ids,
350            result_tx,
351        } => {
352            tracing::debug!(in_reply_to = %in_reply_to, "Executing thread tweet action");
353            let r = executor
354                .execute_reply(&in_reply_to, &content, &media_ids)
355                .await;
356            (r, result_tx)
357        }
358    };
359
360    let post_result = match &result {
361        Ok(id) => {
362            tracing::info!(tweet_id = %id, "Post action succeeded");
363            PostResult::Success
364        }
365        Err(e) => {
366            tracing::warn!(error = %e, "Post action failed");
367            PostResult::Error(e.clone())
368        }
369    };
370
371    if let Some(tx) = result_tx {
372        // Ignore send error (receiver may have been dropped).
373        let _ = tx.send(result);
374    }
375
376    post_result
377}
378
379/// Compute a randomized delay between `min` and `max`.
380fn randomized_delay(min: Duration, max: Duration) -> Duration {
381    if min >= max || min.is_zero() && max.is_zero() {
382        return min;
383    }
384    let min_ms = min.as_millis() as u64;
385    let max_ms = max.as_millis() as u64;
386    Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use std::sync::Mutex;
393
394    /// Mock executor that records all calls.
395    struct MockExecutor {
396        calls: Mutex<Vec<(String, String)>>,
397        fail: bool,
398    }
399
400    impl MockExecutor {
401        fn new() -> Self {
402            Self {
403                calls: Mutex::new(Vec::new()),
404                fail: false,
405            }
406        }
407
408        fn failing() -> Self {
409            Self {
410                calls: Mutex::new(Vec::new()),
411                fail: true,
412            }
413        }
414
415        fn call_count(&self) -> usize {
416            self.calls.lock().expect("lock poisoned").len()
417        }
418
419        fn calls(&self) -> Vec<(String, String)> {
420            self.calls.lock().expect("lock poisoned").clone()
421        }
422    }
423
424    #[async_trait::async_trait]
425    impl PostExecutor for MockExecutor {
426        async fn execute_reply(
427            &self,
428            tweet_id: &str,
429            content: &str,
430            _media_ids: &[String],
431        ) -> Result<String, String> {
432            self.calls
433                .lock()
434                .expect("lock poisoned")
435                .push(("reply".to_string(), format!("{tweet_id}:{content}")));
436            if self.fail {
437                Err("mock error".to_string())
438            } else {
439                Ok("reply-id-123".to_string())
440            }
441        }
442
443        async fn execute_tweet(
444            &self,
445            content: &str,
446            _media_ids: &[String],
447        ) -> Result<String, String> {
448            self.calls
449                .lock()
450                .expect("lock poisoned")
451                .push(("tweet".to_string(), content.to_string()));
452            if self.fail {
453                Err("mock error".to_string())
454            } else {
455                Ok("tweet-id-456".to_string())
456            }
457        }
458    }
459
460    #[tokio::test]
461    async fn process_reply_action() {
462        let executor = Arc::new(MockExecutor::new());
463        let (tx, rx) = create_posting_queue();
464        let cancel = CancellationToken::new();
465
466        let cancel_clone = cancel.clone();
467        let exec_clone = executor.clone();
468        let handle = tokio::spawn(async move {
469            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
470        });
471
472        let (result_tx, result_rx) = oneshot::channel();
473        tx.send(PostAction::Reply {
474            tweet_id: "t1".to_string(),
475            content: "hello".to_string(),
476            media_ids: vec![],
477            result_tx: Some(result_tx),
478        })
479        .await
480        .expect("send failed");
481
482        let result = result_rx.await.expect("oneshot recv");
483        assert_eq!(result, Ok("reply-id-123".to_string()));
484
485        cancel.cancel();
486        handle.await.expect("join");
487        assert_eq!(executor.call_count(), 1);
488    }
489
490    #[tokio::test]
491    async fn process_tweet_action() {
492        let executor = Arc::new(MockExecutor::new());
493        let (tx, rx) = create_posting_queue();
494        let cancel = CancellationToken::new();
495
496        let cancel_clone = cancel.clone();
497        let exec_clone = executor.clone();
498        let handle = tokio::spawn(async move {
499            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
500        });
501
502        let (result_tx, result_rx) = oneshot::channel();
503        tx.send(PostAction::Tweet {
504            content: "my tweet".to_string(),
505            media_ids: vec![],
506            result_tx: Some(result_tx),
507        })
508        .await
509        .expect("send failed");
510
511        let result = result_rx.await.expect("oneshot recv");
512        assert_eq!(result, Ok("tweet-id-456".to_string()));
513
514        cancel.cancel();
515        handle.await.expect("join");
516    }
517
518    #[tokio::test]
519    async fn process_thread_tweet_action() {
520        let executor = Arc::new(MockExecutor::new());
521        let (tx, rx) = create_posting_queue();
522        let cancel = CancellationToken::new();
523
524        let cancel_clone = cancel.clone();
525        let exec_clone = executor.clone();
526        let handle = tokio::spawn(async move {
527            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
528        });
529
530        let (result_tx, result_rx) = oneshot::channel();
531        tx.send(PostAction::ThreadTweet {
532            content: "thread part 2".to_string(),
533            in_reply_to: "prev-id".to_string(),
534            media_ids: vec![],
535            result_tx: Some(result_tx),
536        })
537        .await
538        .expect("send failed");
539
540        let result = result_rx.await.expect("oneshot recv");
541        assert_eq!(result, Ok("reply-id-123".to_string()));
542
543        cancel.cancel();
544        handle.await.expect("join");
545
546        let calls = executor.calls();
547        assert_eq!(calls[0].0, "reply");
548        assert!(calls[0].1.contains("prev-id"));
549    }
550
551    #[tokio::test]
552    async fn result_tx_none_does_not_panic() {
553        let executor = Arc::new(MockExecutor::new());
554        let (tx, rx) = create_posting_queue();
555        let cancel = CancellationToken::new();
556
557        let cancel_clone = cancel.clone();
558        let exec_clone = executor.clone();
559        let handle = tokio::spawn(async move {
560            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
561        });
562
563        tx.send(PostAction::Tweet {
564            content: "fire and forget".to_string(),
565            media_ids: vec![],
566            result_tx: None,
567        })
568        .await
569        .expect("send failed");
570
571        // Give time for processing
572        tokio::time::sleep(Duration::from_millis(50)).await;
573
574        cancel.cancel();
575        handle.await.expect("join");
576        assert_eq!(executor.call_count(), 1);
577    }
578
579    #[tokio::test]
580    async fn failed_action_sends_error_back() {
581        let executor = Arc::new(MockExecutor::failing());
582        let (tx, rx) = create_posting_queue();
583        let cancel = CancellationToken::new();
584
585        let cancel_clone = cancel.clone();
586        let exec_clone = executor.clone();
587        let handle = tokio::spawn(async move {
588            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
589        });
590
591        let (result_tx, result_rx) = oneshot::channel();
592        tx.send(PostAction::Tweet {
593            content: "will fail".to_string(),
594            media_ids: vec![],
595            result_tx: Some(result_tx),
596        })
597        .await
598        .expect("send failed");
599
600        let result = result_rx.await.expect("oneshot recv");
601        assert!(result.is_err());
602        assert_eq!(result.unwrap_err(), "mock error");
603
604        cancel.cancel();
605        handle.await.expect("join");
606    }
607
608    #[tokio::test]
609    async fn channel_close_exits_consumer() {
610        let executor = Arc::new(MockExecutor::new());
611        let (tx, rx) = create_posting_queue();
612        let cancel = CancellationToken::new();
613
614        let handle = tokio::spawn(async move {
615            run_posting_queue(rx, executor, Duration::ZERO, cancel).await;
616        });
617
618        // Drop sender to close channel
619        drop(tx);
620
621        // Consumer should exit cleanly
622        handle.await.expect("join");
623    }
624
625    #[tokio::test]
626    async fn drain_on_cancel() {
627        let executor = Arc::new(MockExecutor::new());
628        let (tx, rx) = create_posting_queue();
629        let cancel = CancellationToken::new();
630
631        // Send actions before starting consumer
632        tx.send(PostAction::Tweet {
633            content: "queued1".to_string(),
634            media_ids: vec![],
635            result_tx: None,
636        })
637        .await
638        .expect("send");
639        tx.send(PostAction::Tweet {
640            content: "queued2".to_string(),
641            media_ids: vec![],
642            result_tx: None,
643        })
644        .await
645        .expect("send");
646
647        // Cancel immediately
648        cancel.cancel();
649
650        let exec_clone = executor.clone();
651        let handle = tokio::spawn(async move {
652            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel).await;
653        });
654
655        handle.await.expect("join");
656
657        // Both queued actions should have been drained
658        assert_eq!(executor.call_count(), 2);
659    }
660
661    #[tokio::test]
662    async fn multiple_actions_processed_in_order() {
663        let executor = Arc::new(MockExecutor::new());
664        let (tx, rx) = create_posting_queue();
665        let cancel = CancellationToken::new();
666
667        let cancel_clone = cancel.clone();
668        let exec_clone = executor.clone();
669        let handle = tokio::spawn(async move {
670            run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
671        });
672
673        for i in 0..5 {
674            tx.send(PostAction::Tweet {
675                content: format!("tweet-{i}"),
676                media_ids: vec![],
677                result_tx: None,
678            })
679            .await
680            .expect("send");
681        }
682
683        // Wait for processing
684        tokio::time::sleep(Duration::from_millis(100)).await;
685
686        cancel.cancel();
687        handle.await.expect("join");
688
689        let calls = executor.calls();
690        assert_eq!(calls.len(), 5);
691        for (i, (action_type, content)) in calls.iter().enumerate() {
692            assert_eq!(action_type, "tweet");
693            assert_eq!(content, &format!("tweet-{i}"));
694        }
695    }
696
697    #[test]
698    fn post_action_debug_format() {
699        let action = PostAction::Reply {
700            tweet_id: "123".to_string(),
701            content: "hello world".to_string(),
702            media_ids: vec![],
703            result_tx: None,
704        };
705        let debug = format!("{action:?}");
706        assert!(debug.contains("Reply"));
707        assert!(debug.contains("123"));
708    }
709
710    // --- Approval queue tests ---
711
712    struct MockApprovalQueue {
713        items: Mutex<Vec<(String, String, String)>>,
714    }
715
716    impl MockApprovalQueue {
717        fn new() -> Self {
718            Self {
719                items: Mutex::new(Vec::new()),
720            }
721        }
722
723        fn item_count(&self) -> usize {
724            self.items.lock().expect("lock").len()
725        }
726    }
727
728    #[async_trait::async_trait]
729    impl ApprovalQueue for MockApprovalQueue {
730        async fn queue_reply(
731            &self,
732            tweet_id: &str,
733            content: &str,
734            _media_paths: &[String],
735        ) -> Result<i64, String> {
736            self.items.lock().expect("lock").push((
737                "reply".to_string(),
738                tweet_id.to_string(),
739                content.to_string(),
740            ));
741            Ok(self.item_count() as i64)
742        }
743
744        async fn queue_tweet(&self, content: &str, _media_paths: &[String]) -> Result<i64, String> {
745            self.items.lock().expect("lock").push((
746                "tweet".to_string(),
747                String::new(),
748                content.to_string(),
749            ));
750            Ok(self.item_count() as i64)
751        }
752    }
753
754    #[tokio::test]
755    async fn approval_mode_queues_instead_of_posting() {
756        let executor = Arc::new(MockExecutor::new());
757        let approval = Arc::new(MockApprovalQueue::new());
758        let (tx, rx) = create_posting_queue();
759        let cancel = CancellationToken::new();
760
761        let cancel_clone = cancel.clone();
762        let exec_clone = executor.clone();
763        let approval_clone = approval.clone();
764        let handle = tokio::spawn(async move {
765            run_posting_queue_with_approval(
766                rx,
767                exec_clone,
768                Some(approval_clone),
769                Duration::ZERO,
770                Duration::ZERO,
771                None,
772                cancel_clone,
773            )
774            .await;
775        });
776
777        let (result_tx, result_rx) = oneshot::channel();
778        tx.send(PostAction::Reply {
779            tweet_id: "t1".to_string(),
780            content: "hello".to_string(),
781            media_ids: vec![],
782            result_tx: Some(result_tx),
783        })
784        .await
785        .expect("send");
786
787        let result = result_rx.await.expect("recv");
788        assert!(result.is_ok());
789        assert!(result.unwrap().starts_with("queued:"));
790
791        // Executor should NOT have been called
792        assert_eq!(executor.call_count(), 0);
793        // Approval queue should have the item
794        assert_eq!(approval.item_count(), 1);
795
796        cancel.cancel();
797        handle.await.expect("join");
798    }
799
800    #[tokio::test]
801    async fn approval_mode_queues_tweets() {
802        let executor = Arc::new(MockExecutor::new());
803        let approval = Arc::new(MockApprovalQueue::new());
804        let (tx, rx) = create_posting_queue();
805        let cancel = CancellationToken::new();
806
807        let cancel_clone = cancel.clone();
808        let exec_clone = executor.clone();
809        let approval_clone = approval.clone();
810        let handle = tokio::spawn(async move {
811            run_posting_queue_with_approval(
812                rx,
813                exec_clone,
814                Some(approval_clone),
815                Duration::ZERO,
816                Duration::ZERO,
817                None,
818                cancel_clone,
819            )
820            .await;
821        });
822
823        tx.send(PostAction::Tweet {
824            content: "my tweet".to_string(),
825            media_ids: vec![],
826            result_tx: None,
827        })
828        .await
829        .expect("send");
830
831        tokio::time::sleep(Duration::from_millis(50)).await;
832
833        assert_eq!(executor.call_count(), 0);
834        assert_eq!(approval.item_count(), 1);
835
836        cancel.cancel();
837        handle.await.expect("join");
838    }
839}