tuitbot_core/automation/posting_queue/
dispatch.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use rand::Rng;
5use tokio::sync::mpsc;
6use tokio_util::sync::CancellationToken;
7
8use crate::automation::circuit_breaker::CircuitBreaker;
9use crate::automation::posting_queue::queue::{ApprovalQueue, PostAction, PostExecutor};
10
11pub async fn run_posting_queue(
19 receiver: mpsc::Receiver<PostAction>,
20 executor: Arc<dyn PostExecutor>,
21 min_delay: Duration,
22 cancel: CancellationToken,
23) {
24 run_posting_queue_with_approval(receiver, executor, None, min_delay, min_delay, None, cancel)
25 .await;
26}
27
28pub async fn run_posting_queue_with_approval(
34 mut receiver: mpsc::Receiver<PostAction>,
35 executor: Arc<dyn PostExecutor>,
36 approval_queue: Option<Arc<dyn ApprovalQueue>>,
37 min_delay: Duration,
38 max_delay: Duration,
39 circuit_breaker: Option<Arc<CircuitBreaker>>,
40 cancel: CancellationToken,
41) {
42 tracing::info!("Posting queue consumer started");
43
44 loop {
45 let action = tokio::select! {
46 biased;
47 _ = cancel.cancelled() => {
48 tracing::info!("Posting queue received cancellation, draining remaining actions");
49 break;
50 }
51 action = receiver.recv() => {
52 match action {
53 Some(a) => a,
54 None => {
55 tracing::info!("Posting queue channel closed");
56 break;
57 }
58 }
59 }
60 };
61
62 if approval_queue.is_none() {
64 if let Some(ref cb) = circuit_breaker {
65 if !cb.should_allow_mutation().await {
66 tracing::warn!("Circuit breaker open — waiting before posting");
67 if !cb.wait_until_closed(&cancel).await {
68 tracing::info!("Cancelled while waiting for circuit breaker");
69 break;
70 }
71 }
72 }
73 }
74
75 let result = execute_or_queue(action, &executor, &approval_queue).await;
76
77 if approval_queue.is_none() {
79 if let Some(ref cb) = circuit_breaker {
80 match result {
81 PostResult::Success => {
82 cb.record_success().await;
83 }
84 PostResult::Error(ref msg) if is_rate_limit_error(msg) => {
85 cb.record_error().await;
86 }
87 _ => {}
88 }
89 }
90 }
91
92 let delay = randomized_delay(min_delay, max_delay);
93 if !delay.is_zero() {
94 tokio::time::sleep(delay).await;
95 }
96 }
97
98 let mut drained = 0u32;
100 while let Ok(action) = receiver.try_recv() {
101 execute_or_queue(action, &executor, &approval_queue).await;
102 drained += 1;
103 }
104
105 if drained > 0 {
106 tracing::info!(
107 count = drained,
108 "Drained remaining actions from posting queue"
109 );
110 }
111
112 tracing::info!("Posting queue consumer stopped");
113}
114
115pub fn is_rate_limit_error(msg: &str) -> bool {
117 let lower = msg.to_lowercase();
118 lower.contains("rate limit")
119 || lower.contains("too many requests")
120 || lower.contains("429")
121 || lower.contains("forbidden")
122 || lower.contains("403")
123}
124
125enum PostResult {
127 Success,
128 Error(String),
129 Queued,
130}
131
132async fn execute_or_queue(
134 action: PostAction,
135 executor: &Arc<dyn PostExecutor>,
136 approval_queue: &Option<Arc<dyn ApprovalQueue>>,
137) -> PostResult {
138 if let Some(queue) = approval_queue {
139 queue_for_approval(action, queue).await;
140 PostResult::Queued
141 } else {
142 execute_and_respond(action, executor).await
143 }
144}
145
146async fn queue_for_approval(action: PostAction, queue: &Arc<dyn ApprovalQueue>) {
148 let (result, result_tx) = match action {
149 PostAction::Reply {
150 tweet_id,
151 content,
152 media_ids: _,
153 result_tx,
154 } => {
155 tracing::info!(tweet_id = %tweet_id, "Queuing reply for approval");
156 let r = queue
157 .queue_reply(&tweet_id, &content, &[])
158 .await
159 .map(|id| format!("queued:{id}"));
160 (r, result_tx)
161 }
162 PostAction::Tweet {
163 content,
164 media_ids: _,
165 result_tx,
166 } => {
167 tracing::info!("Queuing tweet for approval");
168 let r = queue
169 .queue_tweet(&content, &[])
170 .await
171 .map(|id| format!("queued:{id}"));
172 (r, result_tx)
173 }
174 PostAction::ThreadTweet {
175 content,
176 in_reply_to,
177 media_ids: _,
178 result_tx,
179 } => {
180 tracing::info!(in_reply_to = %in_reply_to, "Queuing thread tweet for approval");
181 let r = queue
182 .queue_reply(&in_reply_to, &content, &[])
183 .await
184 .map(|id| format!("queued:{id}"));
185 (r, result_tx)
186 }
187 };
188
189 match &result {
190 Ok(id) => tracing::info!(queue_id = %id, "Action queued for approval"),
191 Err(e) => tracing::warn!(error = %e, "Failed to queue action for approval"),
192 }
193
194 if let Some(tx) = result_tx {
195 let _ = tx.send(result);
196 }
197}
198
199async fn execute_and_respond(action: PostAction, executor: &Arc<dyn PostExecutor>) -> PostResult {
201 let (result, result_tx) = match action {
202 PostAction::Reply {
203 tweet_id,
204 content,
205 media_ids,
206 result_tx,
207 } => {
208 tracing::debug!(tweet_id = %tweet_id, "Executing reply action");
209 let r = executor
210 .execute_reply(&tweet_id, &content, &media_ids)
211 .await;
212 (r, result_tx)
213 }
214 PostAction::Tweet {
215 content,
216 media_ids,
217 result_tx,
218 } => {
219 tracing::debug!("Executing tweet action");
220 let r = executor.execute_tweet(&content, &media_ids).await;
221 (r, result_tx)
222 }
223 PostAction::ThreadTweet {
224 content,
225 in_reply_to,
226 media_ids,
227 result_tx,
228 } => {
229 tracing::debug!(in_reply_to = %in_reply_to, "Executing thread tweet action");
230 let r = executor
231 .execute_reply(&in_reply_to, &content, &media_ids)
232 .await;
233 (r, result_tx)
234 }
235 };
236
237 let post_result = match &result {
238 Ok(id) => {
239 tracing::info!(tweet_id = %id, "Post action succeeded");
240 PostResult::Success
241 }
242 Err(e) => {
243 tracing::warn!(error = %e, "Post action failed");
244 PostResult::Error(e.clone())
245 }
246 };
247
248 if let Some(tx) = result_tx {
249 let _ = tx.send(result);
251 }
252
253 post_result
254}
255
256pub fn randomized_delay(min: Duration, max: Duration) -> Duration {
258 if min >= max || min.is_zero() && max.is_zero() {
259 return min;
260 }
261 let min_ms = min.as_millis() as u64;
262 let max_ms = max.as_millis() as u64;
263 Duration::from_millis(rand::rng().random_range(min_ms..=max_ms))
264}