Skip to main content

tuitbot_core/automation/posting_queue/
dispatch.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use rand::Rng;
5use tokio::sync::mpsc;
6use tokio_util::sync::CancellationToken;
7
8use crate::automation::circuit_breaker::CircuitBreaker;
9use crate::automation::posting_queue::queue::{ApprovalQueue, PostAction, PostExecutor};
10
11/// Run the posting queue consumer loop.
12///
13/// Processes actions sequentially with `min_delay` between each post.
14/// On cancellation, drains remaining actions in the channel before exiting.
15///
16/// When `approval_queue` is `Some`, actions are queued for human review
17/// instead of being posted directly.
18pub async fn run_posting_queue(
19    receiver: mpsc::Receiver<PostAction>,
20    executor: Arc<dyn PostExecutor>,
21    min_delay: Duration,
22    cancel: CancellationToken,
23) {
24    run_posting_queue_with_approval(receiver, executor, None, min_delay, min_delay, None, cancel)
25        .await;
26}
27
28/// Run the posting queue consumer loop with optional approval mode.
29///
30/// Delay between posts is randomized uniformly in `[min_delay, max_delay]`.
31/// If a `circuit_breaker` is provided, mutations are gated: the queue blocks
32/// while the breaker is Open, and errors/successes are recorded.
33pub async fn run_posting_queue_with_approval(
34    mut receiver: mpsc::Receiver<PostAction>,
35    executor: Arc<dyn PostExecutor>,
36    approval_queue: Option<Arc<dyn ApprovalQueue>>,
37    min_delay: Duration,
38    max_delay: Duration,
39    circuit_breaker: Option<Arc<CircuitBreaker>>,
40    cancel: CancellationToken,
41) {
42    tracing::info!("Posting queue consumer started");
43
44    loop {
45        let action = tokio::select! {
46            biased;
47            _ = cancel.cancelled() => {
48                tracing::info!("Posting queue received cancellation, draining remaining actions");
49                break;
50            }
51            action = receiver.recv() => {
52                match action {
53                    Some(a) => a,
54                    None => {
55                        tracing::info!("Posting queue channel closed");
56                        break;
57                    }
58                }
59            }
60        };
61
62        // Gate on circuit breaker (only for direct execution, not approval queue).
63        if approval_queue.is_none() {
64            if let Some(ref cb) = circuit_breaker {
65                if !cb.should_allow_mutation().await {
66                    tracing::warn!("Circuit breaker open — waiting before posting");
67                    if !cb.wait_until_closed(&cancel).await {
68                        tracing::info!("Cancelled while waiting for circuit breaker");
69                        break;
70                    }
71                }
72            }
73        }
74
75        let result = execute_or_queue(action, &executor, &approval_queue).await;
76
77        // Record result in circuit breaker.
78        if approval_queue.is_none() {
79            if let Some(ref cb) = circuit_breaker {
80                match result {
81                    PostResult::Success => {
82                        cb.record_success().await;
83                    }
84                    PostResult::Error(ref msg) if is_rate_limit_error(msg) => {
85                        cb.record_error().await;
86                    }
87                    _ => {}
88                }
89            }
90        }
91
92        let delay = randomized_delay(min_delay, max_delay);
93        if !delay.is_zero() {
94            tokio::time::sleep(delay).await;
95        }
96    }
97
98    // Drain remaining actions after cancellation or channel close.
99    let mut drained = 0u32;
100    while let Ok(action) = receiver.try_recv() {
101        execute_or_queue(action, &executor, &approval_queue).await;
102        drained += 1;
103    }
104
105    if drained > 0 {
106        tracing::info!(
107            count = drained,
108            "Drained remaining actions from posting queue"
109        );
110    }
111
112    tracing::info!("Posting queue consumer stopped");
113}
114
115/// Whether an error message indicates a rate limit or forbidden response.
116pub fn is_rate_limit_error(msg: &str) -> bool {
117    let lower = msg.to_lowercase();
118    lower.contains("rate limit")
119        || lower.contains("too many requests")
120        || lower.contains("429")
121        || lower.contains("forbidden")
122        || lower.contains("403")
123}
124
125/// Outcome of a post action (for circuit breaker tracking).
126enum PostResult {
127    Success,
128    Error(String),
129    Queued,
130}
131
132/// Route a post action: queue for approval if approval mode is on, otherwise execute.
133async fn execute_or_queue(
134    action: PostAction,
135    executor: &Arc<dyn PostExecutor>,
136    approval_queue: &Option<Arc<dyn ApprovalQueue>>,
137) -> PostResult {
138    if let Some(queue) = approval_queue {
139        queue_for_approval(action, queue).await;
140        PostResult::Queued
141    } else {
142        execute_and_respond(action, executor).await
143    }
144}
145
146/// Queue a post action for human approval instead of posting.
147async fn queue_for_approval(action: PostAction, queue: &Arc<dyn ApprovalQueue>) {
148    let (result, result_tx) = match action {
149        PostAction::Reply {
150            tweet_id,
151            content,
152            media_ids: _,
153            result_tx,
154        } => {
155            tracing::info!(tweet_id = %tweet_id, "Queuing reply for approval");
156            let r = queue
157                .queue_reply(&tweet_id, &content, &[])
158                .await
159                .map(|id| format!("queued:{id}"));
160            (r, result_tx)
161        }
162        PostAction::Tweet {
163            content,
164            media_ids: _,
165            result_tx,
166        } => {
167            tracing::info!("Queuing tweet for approval");
168            let r = queue
169                .queue_tweet(&content, &[])
170                .await
171                .map(|id| format!("queued:{id}"));
172            (r, result_tx)
173        }
174        PostAction::ThreadTweet {
175            content,
176            in_reply_to,
177            media_ids: _,
178            result_tx,
179        } => {
180            tracing::info!(in_reply_to = %in_reply_to, "Queuing thread tweet for approval");
181            let r = queue
182                .queue_reply(&in_reply_to, &content, &[])
183                .await
184                .map(|id| format!("queued:{id}"));
185            (r, result_tx)
186        }
187    };
188
189    match &result {
190        Ok(id) => tracing::info!(queue_id = %id, "Action queued for approval"),
191        Err(e) => tracing::warn!(error = %e, "Failed to queue action for approval"),
192    }
193
194    if let Some(tx) = result_tx {
195        let _ = tx.send(result);
196    }
197}
198
199/// Execute a single post action and send the result back via oneshot.
200async fn execute_and_respond(action: PostAction, executor: &Arc<dyn PostExecutor>) -> PostResult {
201    let (result, result_tx) = match action {
202        PostAction::Reply {
203            tweet_id,
204            content,
205            media_ids,
206            result_tx,
207        } => {
208            tracing::debug!(tweet_id = %tweet_id, "Executing reply action");
209            let r = executor
210                .execute_reply(&tweet_id, &content, &media_ids)
211                .await;
212            (r, result_tx)
213        }
214        PostAction::Tweet {
215            content,
216            media_ids,
217            result_tx,
218        } => {
219            tracing::debug!("Executing tweet action");
220            let r = executor.execute_tweet(&content, &media_ids).await;
221            (r, result_tx)
222        }
223        PostAction::ThreadTweet {
224            content,
225            in_reply_to,
226            media_ids,
227            result_tx,
228        } => {
229            tracing::debug!(in_reply_to = %in_reply_to, "Executing thread tweet action");
230            let r = executor
231                .execute_reply(&in_reply_to, &content, &media_ids)
232                .await;
233            (r, result_tx)
234        }
235    };
236
237    let post_result = match &result {
238        Ok(id) => {
239            tracing::info!(tweet_id = %id, "Post action succeeded");
240            PostResult::Success
241        }
242        Err(e) => {
243            tracing::warn!(error = %e, "Post action failed");
244            PostResult::Error(e.clone())
245        }
246    };
247
248    if let Some(tx) = result_tx {
249        // Ignore send error (receiver may have been dropped).
250        let _ = tx.send(result);
251    }
252
253    post_result
254}
255
256/// Compute a randomized delay between `min` and `max`.
257pub fn randomized_delay(min: Duration, max: Duration) -> Duration {
258    if min >= max || min.is_zero() && max.is_zero() {
259        return min;
260    }
261    let min_ms = min.as_millis() as u64;
262    let max_ms = max.as_millis() as u64;
263    Duration::from_millis(rand::rng().random_range(min_ms..=max_ms))
264}