#[derive(Debug)]
#[cfg(test)]
pub struct BoundedEventQueue<T> {
sender: mpsc::SyncSender<T>,
receiver: mpsc::Receiver<T>,
metrics: QueueMetrics,
}
#[derive(Debug, Clone, Default)]
#[cfg(test)]
pub struct QueueMetrics {
pub depth: usize,
pub backpressure_count: usize,
pub max_depth: usize,
}
#[cfg(test)]
impl<T: std::fmt::Debug> BoundedEventQueue<T> {
pub fn new() -> Self {
let config = get_queue_config();
Self::with_config(config)
}
pub fn with_config(config: QueueConfig) -> Self {
let (sender, receiver) = mpsc::sync_channel(config.capacity);
Self {
sender,
receiver,
metrics: QueueMetrics::default(),
}
}
pub fn send(self, event: T) -> Self {
match self.sender.send(event) {
Ok(()) => {
let new_depth = self.metrics.depth.saturating_add(1);
Self {
sender: self.sender,
receiver: self.receiver,
metrics: QueueMetrics {
depth: new_depth,
backpressure_count: self.metrics.backpressure_count,
max_depth: self.metrics.max_depth.max(new_depth),
},
}
}
Err(mpsc::SendError(event)) => {
panic!("Receiver dropped unexpectedly: {:?}", event);
}
}
}
pub fn try_send(mut self, event: T) -> Self {
match self.sender.try_send(event) {
Ok(()) => {
self.metrics.depth = self.metrics.depth.saturating_add(1);
self.metrics.max_depth = self.metrics.max_depth.max(self.metrics.depth);
self
}
Err(mpsc::TrySendError::Full(_)) => {
self.metrics.backpressure_count = self.metrics.backpressure_count.saturating_add(1);
self
}
Err(mpsc::TrySendError::Disconnected(event)) => {
panic!("Try send failed: {:?}", event);
}
}
}
pub fn recv(self) -> (Self, Result<T, mpsc::RecvError>) {
match self.receiver.recv() {
Ok(event) => {
let mut new_self = self;
new_self.metrics.depth = new_self.metrics.depth.saturating_sub(1);
(new_self, Ok(event))
}
Err(e) => (self, Err(e)),
}
}
#[must_use]
pub const fn metrics(&self) -> &QueueMetrics {
&self.metrics
}
#[must_use]
pub const fn depth(&self) -> usize {
self.metrics.depth
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.depth() == 0
}
pub fn clear(self) -> Self {
while self.receiver.try_recv().is_ok() {}
Self {
sender: self.sender,
receiver: self.receiver,
metrics: QueueMetrics {
depth: 0,
backpressure_count: self.metrics.backpressure_count,
max_depth: self.metrics.max_depth,
},
}
}
pub fn reset_metrics(self) -> Self {
Self {
metrics: QueueMetrics {
depth: self.metrics.depth,
..Default::default()
},
..self
}
}
}
#[cfg(test)]
impl<T: std::fmt::Debug> Default for BoundedEventQueue<T> {
fn default() -> Self {
Self::new()
}
}