use std::sync::Arc;
use tokio::time::Instant;
pub struct BatchBuffer<E> {
pub(super) buffer: Vec<E>,
pub(super) last_flush: Instant,
metrics_labels: Option<Arc<[(String, String)]>>,
metrics_enabled: bool,
}
impl<E> BatchBuffer<E> {
pub fn new(initial_capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(initial_capacity),
last_flush: Instant::now(),
metrics_labels: None,
metrics_enabled: false,
}
}
pub fn with_length_gauge(
mut self,
node_id: u32,
buffer_name: &'static str,
enabled: bool,
) -> Self {
self.metrics_labels = Some(Arc::from(vec![
("node_id".to_string(), node_id.to_string()),
("buffer".to_string(), buffer_name.to_string()),
]));
self.metrics_enabled = enabled;
self
}
pub fn push(
&mut self,
request: E,
) {
self.buffer.push(request);
if self.metrics_enabled {
if let Some(ref labels) = self.metrics_labels {
metrics::gauge!("batch.buffer_length", labels.as_ref())
.set(self.buffer.len() as f64);
}
}
}
pub fn take_all(&mut self) -> Vec<E> {
self.last_flush = Instant::now();
let items = std::mem::take(&mut self.buffer);
if self.metrics_enabled {
if let Some(ref labels) = self.metrics_labels {
metrics::gauge!("batch.buffer_length", labels.as_ref()).set(0.0);
}
}
items
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
#[cfg(test)]
pub fn clear(&mut self) {
self.buffer.clear();
self.last_flush = Instant::now();
}
}