use crate::ircmsg::{ClientMsg, ServerMsg};
use crate::string::{Key, NoNul, User};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
pub struct Queue {
queue: VecDeque<ClientMsg<'static>>,
delay: Duration,
sub: Duration,
timepoint: Instant,
labeler: Option<Box<dyn FnMut() -> NoNul<'static> + Send>>,
adjuster: Option<Box<dyn Adjuster>>,
}
impl std::fmt::Debug for Queue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut f = f.debug_struct("Queue");
f.field("queue", &self.queue)
.field("delay", &self.delay)
.field("sub", &self.sub)
.field("timepoint", &self.timepoint)
.field("labeler", &self.labeler.is_some())
.finish()
}
}
impl Default for Queue {
fn default() -> Self {
Self::new()
}
}
impl FromIterator<ClientMsg<'static>> for Queue {
fn from_iter<T: IntoIterator<Item = ClientMsg<'static>>>(iter: T) -> Self {
Self::from_queue(iter.into_iter().collect())
}
}
impl From<Vec<ClientMsg<'static>>> for Queue {
fn from(value: Vec<ClientMsg<'static>>) -> Self {
Self::from_queue(value.into())
}
}
impl From<VecDeque<ClientMsg<'static>>> for Queue {
fn from(value: VecDeque<ClientMsg<'static>>) -> Self {
Self::from_queue(value)
}
}
impl Queue {
pub fn new() -> Self {
Self::from_queue(VecDeque::with_capacity(4))
}
fn from_queue(queue: VecDeque<ClientMsg<'static>>) -> Self {
Queue {
queue,
delay: Duration::from_secs(2),
sub: Duration::from_secs(8),
timepoint: Instant::now(),
labeler: None,
adjuster: None,
}
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn set_rate_limit(&mut self, delay: Duration, burst: u32) -> &mut Self {
self.delay = delay;
self.sub = delay.saturating_mul(burst);
let now = Instant::now();
self.timepoint = now.checked_add(self.sub.saturating_add(delay)).unwrap_or(now);
self
}
pub fn pop(&mut self, timeout_fn: impl FnOnce(Option<Duration>)) -> Option<ClientMsg<'static>> {
if let Some(value) = self.queue.pop_front() {
let mut delay = self.timepoint.saturating_duration_since(Instant::now());
delay = delay.saturating_sub(self.sub);
if delay.is_zero() {
self.timepoint = std::cmp::max(self.timepoint, Instant::now()) + self.delay;
Some(value)
} else {
self.queue.push_front(value);
timeout_fn(Some(delay));
None
}
} else {
timeout_fn(None);
None
}
}
pub fn adjust(&mut self, msg: &ServerMsg<'_>) {
if let Some(adj) = self.adjuster.as_mut() {
if adj.should_adjust(msg) {
self.queue.retain_mut(|cmsg| adj.update(cmsg));
}
}
}
pub fn use_adjuster(&mut self, adjuster: impl Adjuster + 'static) -> &mut Self {
self.adjuster = Some(Box::new(adjuster));
self
}
pub fn use_no_adjuster(&mut self) -> &mut Self {
self.adjuster = None;
self
}
pub fn use_labeler(
&mut self,
labeler: impl FnMut() -> NoNul<'static> + 'static + Send,
) -> &mut Self {
self.labeler = Some(Box::new(labeler));
self
}
pub fn use_labeler_default(&mut self) -> &mut Self {
let mut id = 0u32;
self.use_labeler(move || {
id = id.overflowing_add(1).0;
User::from_id(id).into()
})
}
pub fn use_no_labeler(&mut self) -> &mut Self {
self.labeler = None;
self
}
pub fn is_using_labeler(&self) -> bool {
self.labeler.is_some()
}
pub fn edit(&mut self) -> QueueEditGuard<'_> {
let orig_len = self.queue.len();
QueueEditGuard { queue: self, orig_len }
}
pub fn clear(&mut self) {
self.queue.clear();
}
pub fn reset(&mut self) {
self.clear();
self.use_no_labeler();
self.timepoint = Instant::now();
if let Some(adjuster) = self.adjuster.as_mut() {
adjuster.reset();
}
}
}
pub struct QueueEditGuard<'a> {
queue: &'a mut Queue,
orig_len: usize,
}
impl QueueEditGuard<'_> {
pub fn push(&mut self, msg: ClientMsg<'static>) {
self.queue.queue.push_back(msg);
}
pub fn push_labeled(&mut self, mut msg: ClientMsg<'static>) -> Option<NoNul<'static>> {
let label = self.queue.labeler.as_deref_mut().map(|labeler| {
let label = labeler();
msg.tags.edit().insert_pair(Key::from_str("label"), label.clone());
label
});
self.push(msg);
label
}
pub fn is_using_labeler(&self) -> bool {
self.queue.labeler.is_some()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize {
self.queue.queue.len() - self.orig_len
}
pub fn clear(&mut self) -> &mut Self {
self.queue.queue.truncate(self.orig_len);
self
}
pub fn edit(&mut self) -> QueueEditGuard<'_> {
let orig_len = self.queue.len();
QueueEditGuard { queue: self.queue, orig_len }
}
}
impl Extend<ClientMsg<'static>> for Queue {
fn extend<T: IntoIterator<Item = ClientMsg<'static>>>(&mut self, iter: T) {
self.queue.extend(iter);
}
}
pub trait Adjuster: Send {
#[allow(unused_variables)]
fn should_adjust(&mut self, msg: &ServerMsg<'_>) -> bool {
true
}
fn update(&mut self, msg: &mut ClientMsg<'_>) -> bool;
fn reset(&mut self);
}
#[derive(Default)]
pub struct MultiAdjuster {
adjusters: Vec<(Box<dyn Adjuster>, bool)>,
}
impl MultiAdjuster {
pub fn new() -> MultiAdjuster {
MultiAdjuster { adjusters: Vec::new() }
}
pub fn add<T: Adjuster + 'static>(&mut self, adjuster: T) {
self.adjusters.push((Box::new(adjuster), false));
}
pub fn clear(&mut self) {
self.adjusters.clear();
}
pub fn is_empty(&self) -> bool {
self.adjusters.is_empty()
}
pub fn len(&self) -> usize {
self.adjusters.len()
}
}
impl FromIterator<Box<dyn Adjuster>> for MultiAdjuster {
fn from_iter<I: IntoIterator<Item = Box<dyn Adjuster>>>(iter: I) -> Self {
MultiAdjuster { adjusters: iter.into_iter().map(|b| (b, false)).collect() }
}
}
impl Adjuster for MultiAdjuster {
fn should_adjust(&mut self, msg: &ServerMsg<'_>) -> bool {
let mut retval = false;
for (adj, should_adjust) in &mut self.adjusters {
*should_adjust = adj.should_adjust(msg);
retval |= *should_adjust;
}
retval
}
fn update(&mut self, msg: &mut ClientMsg<'_>) -> bool {
let mut retval = true;
for (adj, should_adjust) in &mut self.adjusters {
if *should_adjust {
retval &= adj.update(msg);
}
}
retval
}
fn reset(&mut self) {
for (adj, should_adjust) in &mut self.adjusters {
adj.reset();
*should_adjust = false;
}
}
}