mod request;
mod request_lock;
mod requester_impl;
mod settings;
mod worker;
use std::{
future::Future,
hash::{Hash, Hasher},
};
use tokio::sync::{
mpsc,
oneshot::{self},
};
use crate::{errors::AsResponseParameters, requests::Requester, types::*};
use self::{
request_lock::{channel, RequestLock},
worker::{worker, FreezeUntil, InfoMessage},
};
pub use request::{ThrottlingRequest, ThrottlingSend};
pub use settings::{Limits, Settings};
#[derive(Clone, Debug)]
pub struct Throttle<B> {
bot: B,
queue: mpsc::Sender<(ChatIdHash, RequestLock)>,
info_tx: mpsc::Sender<InfoMessage>,
}
impl<B> Throttle<B> {
pub fn new(bot: B, limits: Limits) -> (Self, impl Future<Output = ()>)
where
B: Requester + Clone,
B::Err: AsResponseParameters,
{
let settings = Settings { limits, ..<_>::default() };
Self::with_settings(bot, settings)
}
pub fn with_settings(bot: B, settings: Settings) -> (Self, impl Future<Output = ()>)
where
B: Requester + Clone,
B::Err: AsResponseParameters,
{
let (tx, rx) = mpsc::channel(settings.limits.messages_per_sec_overall as usize);
let (info_tx, info_rx) = mpsc::channel(2);
let worker = worker(settings, rx, info_rx, bot.clone());
let this = Self { bot, queue: tx, info_tx };
(this, worker)
}
pub fn new_spawn(bot: B, limits: Limits) -> Self
where
B: Requester + Clone + Send + Sync + 'static,
B::Err: AsResponseParameters,
B::GetChat: Send,
{
let (this, worker) = Self::new(bot, limits);
tokio::spawn(worker);
this
}
pub fn spawn_with_settings(bot: B, settings: Settings) -> Self
where
B: Requester + Clone + Send + Sync + 'static,
B::Err: AsResponseParameters,
B::GetChat: Send,
{
let (this, worker) = Self::with_settings(bot, settings);
tokio::spawn(worker);
this
}
pub fn inner(&self) -> &B {
&self.bot
}
pub fn into_inner(self) -> B {
self.bot
}
pub async fn limits(&self) -> Limits {
const WORKER_DIED: &str = "worker died before last `Throttle` instance";
let (tx, rx) = oneshot::channel();
self.info_tx.send(InfoMessage::GetLimits { response: tx }).await.expect(WORKER_DIED);
rx.await.expect(WORKER_DIED)
}
pub async fn set_limits(&self, new: Limits) {
let (tx, rx) = oneshot::channel();
self.info_tx.send(InfoMessage::SetLimits { new, response: tx }).await.ok();
rx.await.ok();
}
}
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq)]
enum ChatIdHash {
Id(ChatId),
ChannelUsernameHash(u64),
}
impl ChatIdHash {
fn is_channel_or_supergroup(&self) -> bool {
match self {
&Self::Id(id) => id.is_channel_or_supergroup(),
Self::ChannelUsernameHash(_) => true,
}
}
}
impl From<&ChatId> for ChatIdHash {
fn from(value: &ChatId) -> Self {
ChatIdHash::Id(*value)
}
}
impl From<&Recipient> for ChatIdHash {
fn from(value: &Recipient) -> Self {
match value {
Recipient::Id(id) => ChatIdHash::Id(*id),
Recipient::ChannelUsername(username) => {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
username.hash(&mut hasher);
let hash = hasher.finish();
ChatIdHash::ChannelUsernameHash(hash)
}
}
}
}