#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![deny(
clippy::all,
clippy::missing_const_for_fn,
clippy::pedantic,
future_incompatible,
missing_docs,
nonstandard_style,
rust_2018_idioms,
rustdoc::broken_intra_doc_links,
unsafe_code,
unused
)]
#![allow(
clippy::module_name_repetitions,
clippy::must_use_candidate,
clippy::unnecessary_wraps,
clippy::used_underscore_binding
)]
#![doc = include_str!("../README.md")]
#[cfg(feature = "twilight-http")]
mod day_limiter;
#[cfg(feature = "twilight-http")]
mod large_bot_queue;
#[cfg(feature = "twilight-http")]
pub use large_bot_queue::LargeBotQueue;
use std::{
fmt::Debug,
future::{self, Future},
pin::Pin,
time::Duration,
};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{self, Sender},
},
time::sleep,
};
pub trait Queue: Debug + Send + Sync {
fn request<'a>(&'a self, shard_id: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}
#[derive(Clone, Debug)]
pub struct LocalQueue(UnboundedSender<Sender<()>>);
impl Default for LocalQueue {
fn default() -> Self {
Self::new()
}
}
impl LocalQueue {
pub fn new() -> Self {
let (tx, rx) = unbounded_channel();
tokio::spawn(waiter(rx));
Self(tx)
}
}
async fn waiter(mut rx: UnboundedReceiver<Sender<()>>) {
const DUR: Duration = Duration::from_secs(6);
while let Some(req) = rx.recv().await {
if let Err(source) = req.send(()) {
tracing::warn!("skipping, send failed: {source:?}");
}
sleep(DUR).await;
}
}
impl Queue for LocalQueue {
fn request(&'_ self, [id, total]: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
let (tx, rx) = oneshot::channel();
if let Err(source) = self.0.send(tx) {
tracing::warn!("skipping, send failed: {source:?}");
return;
}
tracing::info!("shard {id}/{total} waiting for allowance");
let _ = rx.await;
})
}
}
#[derive(Debug)]
pub struct NoOpQueue;
impl Queue for NoOpQueue {
fn request(&'_ self, [_id, _total]: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(future::ready(()))
}
}
#[cfg(test)]
mod tests {
use super::{LocalQueue, NoOpQueue, Queue};
use static_assertions::{assert_impl_all, assert_obj_safe};
use std::fmt::Debug;
assert_impl_all!(LocalQueue: Clone, Debug, Queue, Send, Sync);
assert_impl_all!(NoOpQueue: Debug, Queue, Send, Sync);
assert_impl_all!(dyn Queue: Debug, Send, Sync);
assert_obj_safe!(Queue);
}