discord_webhook_proxy/api/
webhook_queue.rs

1use super::discord::forward_webhook_request;
2use rocket::serde::{Deserialize, Serialize};
3use sled::Db;
4use std::sync::Arc;
5use tokio::{
6    sync::{
7        mpsc::{channel, Receiver, Sender},
8        Semaphore,
9    },
10    task,
11    time::{sleep, Duration},
12};
13
14const FALLBACK_COOLDOWN_SECS: u64 = 10;
15const QUEUE_SIZE: usize = 50_000;
16const CONCURRENCY_LIMIT: usize = 10;
17
18pub type QueueSender = Sender<Webhook>;
19pub type QueueReceiver = Receiver<Webhook>;
20
21#[derive(Serialize, Deserialize)]
22#[serde(crate = "rocket::serde")]
23pub struct Webhook {
24    pub id: u64,
25    pub token: String,
26    pub body: String,
27}
28
29pub fn start_webhook_queue() -> QueueSender {
30    let (queue_sender, queue_receiver) = channel::<Webhook>(QUEUE_SIZE);
31    let db = sled::open("webhook-proxy-queue-db").expect("Failed to open sled database");
32
33    tokio::spawn(async move {
34        webhook_queue_handler(queue_receiver, Arc::new(db)).await;
35    });
36
37    queue_sender
38}
39
40// TODO: idk take a look at the expects maybe i doubt sum will go wrong tho
41async fn webhook_queue_handler(mut queue_receiver: QueueReceiver, db: Arc<Db>) {
42    let concurrency_limiter = Arc::new(Semaphore::new(CONCURRENCY_LIMIT));
43
44    while let Some(webhook) = queue_receiver.recv().await {
45        let permit = concurrency_limiter.clone().acquire_owned().await.unwrap();
46        let db = db.clone();
47
48        task::spawn(async move {
49            let id = db.generate_id().expect("Failed to generate id");
50
51            db.insert(
52                id.to_be_bytes(),
53                serde_json::to_vec(&webhook).expect("Failed to deserialize webhook"),
54            )
55                .expect("Failed to add webhook to queue");
56
57            loop {
58                let response = forward_webhook_request(webhook.id, &webhook.token, &webhook.body);
59
60                match response {
61                    Ok((status, _, response)) => match status.code {
62                        429 => {
63                            let retry_after = response
64                                .headers
65                                .get("Retry-After")
66                                .and_then(|retry_after| retry_after.parse::<u64>().ok())
67                                .unwrap_or(FALLBACK_COOLDOWN_SECS);
68
69                            sleep(Duration::from_secs(retry_after)).await;
70
71                            println!("queueed");
72                        },
73                        _ => {
74                            db.remove(id.to_be_bytes())
75                                .expect("Failed to remove webhook from queue");
76
77                            println!("queue done");
78
79                            break;
80                        },
81                    },
82                    Err(_) => continue,
83                }
84            }
85
86            drop(permit);
87        });
88    }
89}