use crossbeam_channel::Sender;
use std::sync::{Arc, Mutex, Weak};
pub(crate) const BATCH_SIZE: usize = 64;
pub(crate) const FLUSH_INTERVAL_MS: u64 = 50;
pub(crate) const FLUSH_INTERVAL_NS: u64 = FLUSH_INTERVAL_MS * 1_000_000;
pub(crate) trait BatchedMeasurement: Sized + Send {
fn elapsed_since_start_ns(&self) -> u64;
fn fetch_sender() -> Option<Sender<Vec<Self>>>;
fn is_flush_boundary(&self) -> bool {
false
}
}
pub(crate) struct MeasurementBatch<M: BatchedMeasurement> {
measurements: Vec<M>,
last_flush_elapsed_ns: u64,
sender: Option<Sender<Vec<M>>>,
}
impl<M: BatchedMeasurement> MeasurementBatch<M> {
pub(crate) fn new() -> Self {
Self {
measurements: Vec::with_capacity(BATCH_SIZE),
last_flush_elapsed_ns: 0,
sender: None,
}
}
pub(crate) fn add(&mut self, measurement: M) {
if self.sender.is_none() {
self.sender = M::fetch_sender();
}
if self.sender.is_none() {
return;
}
let elapsed_since_start_ns = measurement.elapsed_since_start_ns();
let is_boundary = measurement.is_flush_boundary();
self.measurements.push(measurement);
let should_flush = is_boundary
|| self.measurements.len() >= BATCH_SIZE
|| elapsed_since_start_ns.saturating_sub(self.last_flush_elapsed_ns)
>= FLUSH_INTERVAL_NS;
if should_flush {
self.flush();
}
}
pub(crate) fn flush(&mut self) {
if self.measurements.is_empty() {
return;
}
let sender = self.sender.as_ref().expect("Sender must exist");
if let Some(last) = self.measurements.last() {
self.last_flush_elapsed_ns = last.elapsed_since_start_ns();
}
let batch = std::mem::replace(&mut self.measurements, Vec::with_capacity(BATCH_SIZE));
let _ = sender.send(batch);
}
}
impl<M: BatchedMeasurement> Drop for MeasurementBatch<M> {
fn drop(&mut self) {
self.flush();
}
}
pub(crate) struct BatchRegistry<M: BatchedMeasurement> {
batches: Mutex<Vec<Weak<Mutex<MeasurementBatch<M>>>>>,
}
impl<M: BatchedMeasurement> BatchRegistry<M> {
pub(crate) const fn new() -> Self {
Self {
batches: Mutex::new(Vec::new()),
}
}
fn register(&self, batch: &Arc<Mutex<MeasurementBatch<M>>>) {
if let Ok(mut batches) = self.batches.lock() {
batches.push(Arc::downgrade(batch));
}
}
pub(crate) fn flush_all(&self) {
if let Ok(mut batches) = self.batches.lock() {
batches.retain(|weak| match weak.upgrade() {
Some(batch) => {
if let Ok(mut batch) = batch.lock() {
batch.flush();
}
true
}
None => false,
});
}
}
}
pub(crate) fn register_thread_batch<M: BatchedMeasurement>(
registry: &'static BatchRegistry<M>,
) -> Arc<Mutex<MeasurementBatch<M>>> {
let batch = Arc::new(Mutex::new(MeasurementBatch::new()));
registry.register(&batch);
batch
}