1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![warn(
clippy::missing_const_for_fn,
clippy::pedantic,
missing_docs,
unsafe_code
)]
#![allow(
clippy::module_name_repetitions,
clippy::must_use_candidate,
clippy::unnecessary_wraps
)]
#[cfg(feature = "twilight-http")]
mod day_limiter;
#[cfg(feature = "twilight-http")]
mod large_bot_queue;
#[cfg(feature = "twilight-http")]
pub use large_bot_queue::LargeBotQueue;
use std::{
fmt::Debug,
future::{self, Future},
pin::Pin,
time::Duration,
};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot::{self, Sender},
},
time::sleep,
};
/// Queue for shards to request the ability to initialize new sessions with the
/// gateway.
///
/// This will usually only need to be implemented when you have a multi-process
/// sharding setup. Refer to the [module-level] documentation for more
/// information.
///
/// [module-level]: crate
pub trait Queue: Debug + Send + Sync {
/// A shard has requested the ability to request a session initialization
/// with the gateway.
///
/// The returned future must resolve only when the shard can initiate the
/// session.
fn request<'a>(&'a self, shard_id: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}
/// A local, in-process implementation of a [`Queue`] which manages the
/// connection attempts of one or more shards.
///
/// The queue will take incoming requests and then queue them, releasing one of
/// the requests every 6 seconds. The queue is necessary because there's a
/// ratelimit on how often shards can initiate sessions.
///
/// Handling shard queues usually won't need to be manually handled due to the
/// gateway having built-in queueing when managing multiple shards.
///
/// # When not to use this
///
/// This queue implementation is "local", meaning it's intended to be used if
/// you manage shards only in this process. If you run shards in multiple
/// different processes (do you utilize microservices a lot?), then you **must
/// not** use this implementation. Shards across multiple processes may
/// create new sessions at the same time, which is bad.
///
/// It should also not be used for very large sharding, for that the
/// [`LargeBotQueue`] can be used.
///
/// If you can't use this, look into an alternative implementation of the
/// [`Queue`], such as the [`gateway-queue`] broker.
///
/// [`gateway-queue`]: https://github.com/twilight-rs/gateway-queue
#[derive(Clone, Debug)]
pub struct LocalQueue(UnboundedSender<Sender<()>>);
impl Default for LocalQueue {
fn default() -> Self {
Self::new()
}
}
impl LocalQueue {
/// Creates a new local queue.
pub fn new() -> Self {
let (tx, rx) = unbounded_channel();
tokio::spawn(waiter(rx));
Self(tx)
}
}
async fn waiter(mut rx: UnboundedReceiver<Sender<()>>) {
const DUR: Duration = Duration::from_secs(6);
while let Some(req) = rx.recv().await {
if let Err(source) = req.send(()) {
tracing::warn!("skipping, send failed: {source:?}");
} else {
sleep(DUR).await;
}
}
}
impl Queue for LocalQueue {
/// Request to be able to identify with the gateway. This will place this
/// request behind all other requests, and the returned future will resolve
/// once the request has been completed.
fn request(&'_ self, [id, total]: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async move {
let (tx, rx) = oneshot::channel();
if let Err(source) = self.0.send(tx) {
tracing::warn!("skipping, send failed: {source:?}");
return;
}
tracing::info!("shard {id}/{total} waiting for allowance");
_ = rx.await;
})
}
}
/// An implementation of [`Queue`] that instantly allows requests.
///
/// Useful when running behind a proxy gateway. Running without a
/// functional queue **will** get you ratelimited.
#[derive(Debug)]
pub struct NoOpQueue;
impl Queue for NoOpQueue {
fn request(&'_ self, [_id, _total]: [u64; 2]) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(future::ready(()))
}
}
#[cfg(test)]
mod tests {
use super::{LocalQueue, NoOpQueue, Queue};
use static_assertions::{assert_impl_all, assert_obj_safe};
use std::fmt::Debug;
assert_impl_all!(LocalQueue: Clone, Debug, Queue, Send, Sync);
assert_impl_all!(NoOpQueue: Debug, Queue, Send, Sync);
assert_impl_all!(dyn Queue: Debug, Send, Sync);
assert_obj_safe!(Queue);
}