1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
use super::{day_limiter::DayLimiter, Queue};
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{self, Sender},
},
time::sleep,
};
use twilight_http::Client;
/// Queue built for single-process groups of shards that require identifying via
/// [Sharding for Large Bots].
///
/// Usage with other processes will cause inconsistencies between each process's
/// ratelimit buckets. If using multiple processes for shard groups, then refer
/// to the [module-level] documentation.
///
/// [Sharding for Large Bots]: https://discord.com/developers/docs/topics/gateway#sharding-for-very-large-bots
/// [module-level]: crate
#[derive(Debug)]
pub struct LargeBotQueue {
buckets: Vec<UnboundedSender<Sender<()>>>,
limiter: DayLimiter,
}
impl LargeBotQueue {
/// Create a new large bot queue.
///
/// You must provide the number of buckets Discord requires your bot to
/// connect with.
///
/// The number of buckets is provided via Discord as `max_concurrency`
/// which can be fetched with [`Client::gateway`].
pub async fn new(buckets: usize, http: Arc<Client>) -> Self {
let mut queues = Vec::with_capacity(buckets);
for _ in 0..buckets {
let (tx, rx) = unbounded_channel();
tokio::spawn(waiter(rx));
queues.push(tx);
}
let limiter = DayLimiter::new(http).await.expect(
"Getting the first session limits failed, \
Is network connection available?",
);
// The level_enabled macro does not turn off with the dynamic
// tracing levels. It is made for the static_max_level_xxx features
// And will return false if you do not use those features of if
// You use the feature but then dynamically set a lower feature.
if tracing::level_enabled!(tracing::Level::INFO) {
let lock = limiter.0.lock().await;
tracing::info!(
"{}/{} identifies used before next reset in {:.2?}",
lock.current,
lock.total,
lock.next_reset
);
}
Self {
buckets: queues,
limiter,
}
}
}
async fn waiter(mut rx: UnboundedReceiver<Sender<()>>) {
const DUR: Duration = Duration::from_secs(6);
while let Some(req) = rx.recv().await {
if let Err(source) = req.send(()) {
tracing::warn!("skipping, send failed with: {source:?}");
} else {
sleep(DUR).await;
}
}
}
impl Queue for LargeBotQueue {
/// Request to be able to identify with the gateway. This will place this
/// request behind all other requests, and the returned future will resolve
/// once the request has been completed.
fn request(&'_ self, shard_id: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
#[allow(clippy::cast_possible_truncation)]
let bucket = (shard_id[0] % (self.buckets.len() as u64)) as usize;
let (tx, rx) = oneshot::channel();
Box::pin(async move {
self.limiter.get().await;
if let Err(source) = self.buckets[bucket].send(tx) {
tracing::warn!("skipping, send failed with: {source:?}");
return;
}
tracing::info!("waiting for allowance on shard {}", shard_id[0]);
_ = rx.await;
})
}
}
#[cfg(test)]
mod tests {
use super::{LargeBotQueue, Queue};
use static_assertions::assert_impl_all;
use std::fmt::Debug;
assert_impl_all!(LargeBotQueue: Debug, Queue, Send, Sync);
}