discord_webhook_proxy/api/
webhook_queue.rs1use super::discord::forward_webhook_request;
2use rocket::serde::{Deserialize, Serialize};
3use sled::Db;
4use std::sync::Arc;
5use tokio::{
6 sync::{
7 mpsc::{channel, Receiver, Sender},
8 Semaphore,
9 },
10 task,
11 time::{sleep, Duration},
12};
13
14const FALLBACK_COOLDOWN_SECS: u64 = 10;
15const QUEUE_SIZE: usize = 50_000;
16const CONCURRENCY_LIMIT: usize = 10;
17
18pub type QueueSender = Sender<Webhook>;
19pub type QueueReceiver = Receiver<Webhook>;
20
21#[derive(Serialize, Deserialize)]
22#[serde(crate = "rocket::serde")]
23pub struct Webhook {
24 pub id: u64,
25 pub token: String,
26 pub body: String,
27}
28
29pub fn start_webhook_queue() -> QueueSender {
30 let (queue_sender, queue_receiver) = channel::<Webhook>(QUEUE_SIZE);
31 let db = sled::open("webhook-proxy-queue-db").expect("Failed to open sled database");
32
33 tokio::spawn(async move {
34 webhook_queue_handler(queue_receiver, Arc::new(db)).await;
35 });
36
37 queue_sender
38}
39
40async fn webhook_queue_handler(mut queue_receiver: QueueReceiver, db: Arc<Db>) {
42 let concurrency_limiter = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
43
44 while let Some(webhook) = queue_receiver.recv().await {
45 let permit = concurrency_limiter.clone().acquire_owned().await.unwrap();
46 let db = db.clone();
47
48 task::spawn(async move {
49 let id = db.generate_id().expect("Failed to generate id");
50
51 db.insert(
52 id.to_be_bytes(),
53 serde_json::to_vec(&webhook).expect("Failed to deserialize webhook"),
54 )
55 .expect("Failed to add webhook to queue");
56
57 loop {
58 let response = forward_webhook_request(webhook.id, &webhook.token, &webhook.body);
59
60 match response {
61 Ok((status, _, response)) => match status.code {
62 429 => {
63 let retry_after = response
64 .headers
65 .get("Retry-After")
66 .and_then(|retry_after| retry_after.parse::<u64>().ok())
67 .unwrap_or(FALLBACK_COOLDOWN_SECS);
68
69 sleep(Duration::from_secs(retry_after)).await;
70
71 println!("queueed");
72 },
73 _ => {
74 db.remove(id.to_be_bytes())
75 .expect("Failed to remove webhook from queue");
76
77 println!("queue done");
78
79 break;
80 },
81 },
82 Err(_) => continue,
83 }
84 }
85
86 drop(permit);
87 });
88 }
89}