use super::{day_limiter::DayLimiter, Queue};
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{self, Sender},
},
time::sleep,
};
use twilight_http::Client;
#[derive(Debug)]
pub struct LargeBotQueue {
buckets: Vec<UnboundedSender<Sender<()>>>,
limiter: DayLimiter,
}
impl LargeBotQueue {
pub async fn new(buckets: usize, http: Arc<Client>) -> Self {
let mut queues = Vec::with_capacity(buckets);
for _ in 0..buckets {
let (tx, rx) = unbounded_channel();
tokio::spawn(waiter(rx));
queues.push(tx);
}
let limiter = DayLimiter::new(http).await.expect(
"Getting the first session limits failed, \
Is network connection available?",
);
if tracing::level_enabled!(tracing::Level::INFO) {
let lock = limiter.0.lock().await;
tracing::info!(
"{}/{} identifies used before next reset in {:.2?}",
lock.current,
lock.total,
lock.next_reset
);
}
Self {
buckets: queues,
limiter,
}
}
}
async fn waiter(mut rx: UnboundedReceiver<Sender<()>>) {
const DUR: Duration = Duration::from_secs(6);
while let Some(req) = rx.recv().await {
if let Err(source) = req.send(()) {
tracing::warn!("skipping, send failed with: {source:?}");
}
sleep(DUR).await;
}
}
impl Queue for LargeBotQueue {
fn request(&'_ self, shard_id: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
#[allow(clippy::cast_possible_truncation)]
let bucket = (shard_id[0] % (self.buckets.len() as u64)) as usize;
let (tx, rx) = oneshot::channel();
Box::pin(async move {
self.limiter.get().await;
if let Err(source) = self.buckets[bucket].send(tx) {
tracing::warn!("skipping, send failed with: {source:?}");
return;
}
tracing::info!("waiting for allowance on shard {}", shard_id[0]);
let _ = rx.await;
})
}
}
#[cfg(test)]
mod tests {
use super::{LargeBotQueue, Queue};
use static_assertions::assert_impl_all;
use std::fmt::Debug;
assert_impl_all!(LargeBotQueue: Debug, Queue, Send, Sync);
}