1use rand::Rng;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::CancellationToken;
13
14pub const QUEUE_CAPACITY: usize = 100;
16
17pub enum PostAction {
22 Reply {
24 tweet_id: String,
26 content: String,
28 result_tx: Option<oneshot::Sender<Result<String, String>>>,
30 },
31 Tweet {
33 content: String,
35 result_tx: Option<oneshot::Sender<Result<String, String>>>,
37 },
38 ThreadTweet {
40 content: String,
42 in_reply_to: String,
44 result_tx: Option<oneshot::Sender<Result<String, String>>>,
46 },
47}
48
49impl std::fmt::Debug for PostAction {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 PostAction::Reply {
53 tweet_id, content, ..
54 } => f
55 .debug_struct("Reply")
56 .field("tweet_id", tweet_id)
57 .field("content_len", &content.len())
58 .finish(),
59 PostAction::Tweet { content, .. } => f
60 .debug_struct("Tweet")
61 .field("content_len", &content.len())
62 .finish(),
63 PostAction::ThreadTweet {
64 content,
65 in_reply_to,
66 ..
67 } => f
68 .debug_struct("ThreadTweet")
69 .field("in_reply_to", in_reply_to)
70 .field("content_len", &content.len())
71 .finish(),
72 }
73 }
74}
75
76#[async_trait::async_trait]
81pub trait PostExecutor: Send + Sync {
82 async fn execute_reply(&self, tweet_id: &str, content: &str) -> Result<String, String>;
84
85 async fn execute_tweet(&self, content: &str) -> Result<String, String>;
87}
88
89pub fn create_posting_queue() -> (mpsc::Sender<PostAction>, mpsc::Receiver<PostAction>) {
94 mpsc::channel(QUEUE_CAPACITY)
95}
96
97#[async_trait::async_trait]
99pub trait ApprovalQueue: Send + Sync {
100 async fn queue_reply(&self, tweet_id: &str, content: &str) -> Result<i64, String>;
102
103 async fn queue_tweet(&self, content: &str) -> Result<i64, String>;
105}
106
107pub async fn run_posting_queue(
115 receiver: mpsc::Receiver<PostAction>,
116 executor: Arc<dyn PostExecutor>,
117 min_delay: Duration,
118 cancel: CancellationToken,
119) {
120 run_posting_queue_with_approval(receiver, executor, None, min_delay, min_delay, cancel).await;
121}
122
123pub async fn run_posting_queue_with_approval(
127 mut receiver: mpsc::Receiver<PostAction>,
128 executor: Arc<dyn PostExecutor>,
129 approval_queue: Option<Arc<dyn ApprovalQueue>>,
130 min_delay: Duration,
131 max_delay: Duration,
132 cancel: CancellationToken,
133) {
134 tracing::info!("Posting queue consumer started");
135
136 loop {
137 let action = tokio::select! {
138 biased;
139 _ = cancel.cancelled() => {
140 tracing::info!("Posting queue received cancellation, draining remaining actions");
141 break;
142 }
143 action = receiver.recv() => {
144 match action {
145 Some(a) => a,
146 None => {
147 tracing::info!("Posting queue channel closed");
148 break;
149 }
150 }
151 }
152 };
153
154 execute_or_queue(action, &executor, &approval_queue).await;
155
156 let delay = randomized_delay(min_delay, max_delay);
157 if !delay.is_zero() {
158 tokio::time::sleep(delay).await;
159 }
160 }
161
162 let mut drained = 0u32;
164 while let Ok(action) = receiver.try_recv() {
165 execute_or_queue(action, &executor, &approval_queue).await;
166 drained += 1;
167 }
168
169 if drained > 0 {
170 tracing::info!(
171 count = drained,
172 "Drained remaining actions from posting queue"
173 );
174 }
175
176 tracing::info!("Posting queue consumer stopped");
177}
178
179async fn execute_or_queue(
181 action: PostAction,
182 executor: &Arc<dyn PostExecutor>,
183 approval_queue: &Option<Arc<dyn ApprovalQueue>>,
184) {
185 if let Some(queue) = approval_queue {
186 queue_for_approval(action, queue).await;
187 } else {
188 execute_and_respond(action, executor).await;
189 }
190}
191
192async fn queue_for_approval(action: PostAction, queue: &Arc<dyn ApprovalQueue>) {
194 let (result, result_tx) = match action {
195 PostAction::Reply {
196 tweet_id,
197 content,
198 result_tx,
199 } => {
200 tracing::info!(tweet_id = %tweet_id, "Queuing reply for approval");
201 let r = queue
202 .queue_reply(&tweet_id, &content)
203 .await
204 .map(|id| format!("queued:{id}"));
205 (r, result_tx)
206 }
207 PostAction::Tweet {
208 content, result_tx, ..
209 } => {
210 tracing::info!("Queuing tweet for approval");
211 let r = queue
212 .queue_tweet(&content)
213 .await
214 .map(|id| format!("queued:{id}"));
215 (r, result_tx)
216 }
217 PostAction::ThreadTweet {
218 content,
219 in_reply_to,
220 result_tx,
221 } => {
222 tracing::info!(in_reply_to = %in_reply_to, "Queuing thread tweet for approval");
223 let r = queue
224 .queue_reply(&in_reply_to, &content)
225 .await
226 .map(|id| format!("queued:{id}"));
227 (r, result_tx)
228 }
229 };
230
231 match &result {
232 Ok(id) => tracing::info!(queue_id = %id, "Action queued for approval"),
233 Err(e) => tracing::warn!(error = %e, "Failed to queue action for approval"),
234 }
235
236 if let Some(tx) = result_tx {
237 let _ = tx.send(result);
238 }
239}
240
241async fn execute_and_respond(action: PostAction, executor: &Arc<dyn PostExecutor>) {
243 let (result, result_tx) = match action {
244 PostAction::Reply {
245 tweet_id,
246 content,
247 result_tx,
248 } => {
249 tracing::debug!(tweet_id = %tweet_id, "Executing reply action");
250 let r = executor.execute_reply(&tweet_id, &content).await;
251 (r, result_tx)
252 }
253 PostAction::Tweet {
254 content, result_tx, ..
255 } => {
256 tracing::debug!("Executing tweet action");
257 let r = executor.execute_tweet(&content).await;
258 (r, result_tx)
259 }
260 PostAction::ThreadTweet {
261 content,
262 in_reply_to,
263 result_tx,
264 } => {
265 tracing::debug!(in_reply_to = %in_reply_to, "Executing thread tweet action");
266 let r = executor.execute_reply(&in_reply_to, &content).await;
267 (r, result_tx)
268 }
269 };
270
271 match &result {
272 Ok(id) => tracing::info!(tweet_id = %id, "Post action succeeded"),
273 Err(e) => tracing::warn!(error = %e, "Post action failed"),
274 }
275
276 if let Some(tx) = result_tx {
277 let _ = tx.send(result);
279 }
280}
281
282fn randomized_delay(min: Duration, max: Duration) -> Duration {
284 if min >= max || min.is_zero() && max.is_zero() {
285 return min;
286 }
287 let min_ms = min.as_millis() as u64;
288 let max_ms = max.as_millis() as u64;
289 Duration::from_millis(rand::thread_rng().gen_range(min_ms..=max_ms))
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use std::sync::Mutex;
296
297 struct MockExecutor {
299 calls: Mutex<Vec<(String, String)>>,
300 fail: bool,
301 }
302
303 impl MockExecutor {
304 fn new() -> Self {
305 Self {
306 calls: Mutex::new(Vec::new()),
307 fail: false,
308 }
309 }
310
311 fn failing() -> Self {
312 Self {
313 calls: Mutex::new(Vec::new()),
314 fail: true,
315 }
316 }
317
318 fn call_count(&self) -> usize {
319 self.calls.lock().expect("lock poisoned").len()
320 }
321
322 fn calls(&self) -> Vec<(String, String)> {
323 self.calls.lock().expect("lock poisoned").clone()
324 }
325 }
326
327 #[async_trait::async_trait]
328 impl PostExecutor for MockExecutor {
329 async fn execute_reply(&self, tweet_id: &str, content: &str) -> Result<String, String> {
330 self.calls
331 .lock()
332 .expect("lock poisoned")
333 .push(("reply".to_string(), format!("{tweet_id}:{content}")));
334 if self.fail {
335 Err("mock error".to_string())
336 } else {
337 Ok("reply-id-123".to_string())
338 }
339 }
340
341 async fn execute_tweet(&self, content: &str) -> Result<String, String> {
342 self.calls
343 .lock()
344 .expect("lock poisoned")
345 .push(("tweet".to_string(), content.to_string()));
346 if self.fail {
347 Err("mock error".to_string())
348 } else {
349 Ok("tweet-id-456".to_string())
350 }
351 }
352 }
353
354 #[tokio::test]
355 async fn process_reply_action() {
356 let executor = Arc::new(MockExecutor::new());
357 let (tx, rx) = create_posting_queue();
358 let cancel = CancellationToken::new();
359
360 let cancel_clone = cancel.clone();
361 let exec_clone = executor.clone();
362 let handle = tokio::spawn(async move {
363 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
364 });
365
366 let (result_tx, result_rx) = oneshot::channel();
367 tx.send(PostAction::Reply {
368 tweet_id: "t1".to_string(),
369 content: "hello".to_string(),
370 result_tx: Some(result_tx),
371 })
372 .await
373 .expect("send failed");
374
375 let result = result_rx.await.expect("oneshot recv");
376 assert_eq!(result, Ok("reply-id-123".to_string()));
377
378 cancel.cancel();
379 handle.await.expect("join");
380 assert_eq!(executor.call_count(), 1);
381 }
382
383 #[tokio::test]
384 async fn process_tweet_action() {
385 let executor = Arc::new(MockExecutor::new());
386 let (tx, rx) = create_posting_queue();
387 let cancel = CancellationToken::new();
388
389 let cancel_clone = cancel.clone();
390 let exec_clone = executor.clone();
391 let handle = tokio::spawn(async move {
392 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
393 });
394
395 let (result_tx, result_rx) = oneshot::channel();
396 tx.send(PostAction::Tweet {
397 content: "my tweet".to_string(),
398 result_tx: Some(result_tx),
399 })
400 .await
401 .expect("send failed");
402
403 let result = result_rx.await.expect("oneshot recv");
404 assert_eq!(result, Ok("tweet-id-456".to_string()));
405
406 cancel.cancel();
407 handle.await.expect("join");
408 }
409
410 #[tokio::test]
411 async fn process_thread_tweet_action() {
412 let executor = Arc::new(MockExecutor::new());
413 let (tx, rx) = create_posting_queue();
414 let cancel = CancellationToken::new();
415
416 let cancel_clone = cancel.clone();
417 let exec_clone = executor.clone();
418 let handle = tokio::spawn(async move {
419 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
420 });
421
422 let (result_tx, result_rx) = oneshot::channel();
423 tx.send(PostAction::ThreadTweet {
424 content: "thread part 2".to_string(),
425 in_reply_to: "prev-id".to_string(),
426 result_tx: Some(result_tx),
427 })
428 .await
429 .expect("send failed");
430
431 let result = result_rx.await.expect("oneshot recv");
432 assert_eq!(result, Ok("reply-id-123".to_string()));
433
434 cancel.cancel();
435 handle.await.expect("join");
436
437 let calls = executor.calls();
438 assert_eq!(calls[0].0, "reply");
439 assert!(calls[0].1.contains("prev-id"));
440 }
441
442 #[tokio::test]
443 async fn result_tx_none_does_not_panic() {
444 let executor = Arc::new(MockExecutor::new());
445 let (tx, rx) = create_posting_queue();
446 let cancel = CancellationToken::new();
447
448 let cancel_clone = cancel.clone();
449 let exec_clone = executor.clone();
450 let handle = tokio::spawn(async move {
451 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
452 });
453
454 tx.send(PostAction::Tweet {
455 content: "fire and forget".to_string(),
456 result_tx: None,
457 })
458 .await
459 .expect("send failed");
460
461 tokio::time::sleep(Duration::from_millis(50)).await;
463
464 cancel.cancel();
465 handle.await.expect("join");
466 assert_eq!(executor.call_count(), 1);
467 }
468
469 #[tokio::test]
470 async fn failed_action_sends_error_back() {
471 let executor = Arc::new(MockExecutor::failing());
472 let (tx, rx) = create_posting_queue();
473 let cancel = CancellationToken::new();
474
475 let cancel_clone = cancel.clone();
476 let exec_clone = executor.clone();
477 let handle = tokio::spawn(async move {
478 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
479 });
480
481 let (result_tx, result_rx) = oneshot::channel();
482 tx.send(PostAction::Tweet {
483 content: "will fail".to_string(),
484 result_tx: Some(result_tx),
485 })
486 .await
487 .expect("send failed");
488
489 let result = result_rx.await.expect("oneshot recv");
490 assert!(result.is_err());
491 assert_eq!(result.unwrap_err(), "mock error");
492
493 cancel.cancel();
494 handle.await.expect("join");
495 }
496
497 #[tokio::test]
498 async fn channel_close_exits_consumer() {
499 let executor = Arc::new(MockExecutor::new());
500 let (tx, rx) = create_posting_queue();
501 let cancel = CancellationToken::new();
502
503 let handle = tokio::spawn(async move {
504 run_posting_queue(rx, executor, Duration::ZERO, cancel).await;
505 });
506
507 drop(tx);
509
510 handle.await.expect("join");
512 }
513
514 #[tokio::test]
515 async fn drain_on_cancel() {
516 let executor = Arc::new(MockExecutor::new());
517 let (tx, rx) = create_posting_queue();
518 let cancel = CancellationToken::new();
519
520 tx.send(PostAction::Tweet {
522 content: "queued1".to_string(),
523 result_tx: None,
524 })
525 .await
526 .expect("send");
527 tx.send(PostAction::Tweet {
528 content: "queued2".to_string(),
529 result_tx: None,
530 })
531 .await
532 .expect("send");
533
534 cancel.cancel();
536
537 let exec_clone = executor.clone();
538 let handle = tokio::spawn(async move {
539 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel).await;
540 });
541
542 handle.await.expect("join");
543
544 assert_eq!(executor.call_count(), 2);
546 }
547
548 #[tokio::test]
549 async fn multiple_actions_processed_in_order() {
550 let executor = Arc::new(MockExecutor::new());
551 let (tx, rx) = create_posting_queue();
552 let cancel = CancellationToken::new();
553
554 let cancel_clone = cancel.clone();
555 let exec_clone = executor.clone();
556 let handle = tokio::spawn(async move {
557 run_posting_queue(rx, exec_clone, Duration::ZERO, cancel_clone).await;
558 });
559
560 for i in 0..5 {
561 tx.send(PostAction::Tweet {
562 content: format!("tweet-{i}"),
563 result_tx: None,
564 })
565 .await
566 .expect("send");
567 }
568
569 tokio::time::sleep(Duration::from_millis(100)).await;
571
572 cancel.cancel();
573 handle.await.expect("join");
574
575 let calls = executor.calls();
576 assert_eq!(calls.len(), 5);
577 for (i, (action_type, content)) in calls.iter().enumerate() {
578 assert_eq!(action_type, "tweet");
579 assert_eq!(content, &format!("tweet-{i}"));
580 }
581 }
582
583 #[test]
584 fn post_action_debug_format() {
585 let action = PostAction::Reply {
586 tweet_id: "123".to_string(),
587 content: "hello world".to_string(),
588 result_tx: None,
589 };
590 let debug = format!("{action:?}");
591 assert!(debug.contains("Reply"));
592 assert!(debug.contains("123"));
593 }
594
595 struct MockApprovalQueue {
598 items: Mutex<Vec<(String, String, String)>>,
599 }
600
601 impl MockApprovalQueue {
602 fn new() -> Self {
603 Self {
604 items: Mutex::new(Vec::new()),
605 }
606 }
607
608 fn item_count(&self) -> usize {
609 self.items.lock().expect("lock").len()
610 }
611 }
612
613 #[async_trait::async_trait]
614 impl ApprovalQueue for MockApprovalQueue {
615 async fn queue_reply(&self, tweet_id: &str, content: &str) -> Result<i64, String> {
616 self.items.lock().expect("lock").push((
617 "reply".to_string(),
618 tweet_id.to_string(),
619 content.to_string(),
620 ));
621 Ok(self.item_count() as i64)
622 }
623
624 async fn queue_tweet(&self, content: &str) -> Result<i64, String> {
625 self.items.lock().expect("lock").push((
626 "tweet".to_string(),
627 String::new(),
628 content.to_string(),
629 ));
630 Ok(self.item_count() as i64)
631 }
632 }
633
634 #[tokio::test]
635 async fn approval_mode_queues_instead_of_posting() {
636 let executor = Arc::new(MockExecutor::new());
637 let approval = Arc::new(MockApprovalQueue::new());
638 let (tx, rx) = create_posting_queue();
639 let cancel = CancellationToken::new();
640
641 let cancel_clone = cancel.clone();
642 let exec_clone = executor.clone();
643 let approval_clone = approval.clone();
644 let handle = tokio::spawn(async move {
645 run_posting_queue_with_approval(
646 rx,
647 exec_clone,
648 Some(approval_clone),
649 Duration::ZERO,
650 Duration::ZERO,
651 cancel_clone,
652 )
653 .await;
654 });
655
656 let (result_tx, result_rx) = oneshot::channel();
657 tx.send(PostAction::Reply {
658 tweet_id: "t1".to_string(),
659 content: "hello".to_string(),
660 result_tx: Some(result_tx),
661 })
662 .await
663 .expect("send");
664
665 let result = result_rx.await.expect("recv");
666 assert!(result.is_ok());
667 assert!(result.unwrap().starts_with("queued:"));
668
669 assert_eq!(executor.call_count(), 0);
671 assert_eq!(approval.item_count(), 1);
673
674 cancel.cancel();
675 handle.await.expect("join");
676 }
677
678 #[tokio::test]
679 async fn approval_mode_queues_tweets() {
680 let executor = Arc::new(MockExecutor::new());
681 let approval = Arc::new(MockApprovalQueue::new());
682 let (tx, rx) = create_posting_queue();
683 let cancel = CancellationToken::new();
684
685 let cancel_clone = cancel.clone();
686 let exec_clone = executor.clone();
687 let approval_clone = approval.clone();
688 let handle = tokio::spawn(async move {
689 run_posting_queue_with_approval(
690 rx,
691 exec_clone,
692 Some(approval_clone),
693 Duration::ZERO,
694 Duration::ZERO,
695 cancel_clone,
696 )
697 .await;
698 });
699
700 tx.send(PostAction::Tweet {
701 content: "my tweet".to_string(),
702 result_tx: None,
703 })
704 .await
705 .expect("send");
706
707 tokio::time::sleep(Duration::from_millis(50)).await;
708
709 assert_eq!(executor.call_count(), 0);
710 assert_eq!(approval.item_count(), 1);
711
712 cancel.cancel();
713 handle.await.expect("join");
714 }
715}