hotpath 0.17.0

One profiler for CPU, time, memory, and async code - quickly find and debug performance bottlenecks.
Documentation
use crossbeam_channel::Sender;
use std::sync::{Arc, Mutex, Weak};

pub(crate) const BATCH_SIZE: usize = 64;
pub(crate) const FLUSH_INTERVAL_MS: u64 = 50;
pub(crate) const FLUSH_INTERVAL_NS: u64 = FLUSH_INTERVAL_MS * 1_000_000;

pub(crate) trait BatchedMeasurement: Sized + Send {
    fn elapsed_since_start_ns(&self) -> u64;
    fn fetch_sender() -> Option<Sender<Vec<Self>>>;

    /// Lifecycle events that must reach the worker before the data events they
    /// gate (e.g. `Created`) force an immediate flush so per-thread batching
    /// can't deliver a data event ahead of the entry that establishes its slot.
    fn is_flush_boundary(&self) -> bool {
        false
    }
}

pub(crate) struct MeasurementBatch<M: BatchedMeasurement> {
    measurements: Vec<M>,
    last_flush_elapsed_ns: u64,
    sender: Option<Sender<Vec<M>>>,
}

impl<M: BatchedMeasurement> MeasurementBatch<M> {
    pub(crate) fn new() -> Self {
        Self {
            measurements: Vec::with_capacity(BATCH_SIZE),
            last_flush_elapsed_ns: 0,
            sender: None,
        }
    }

    pub(crate) fn add(&mut self, measurement: M) {
        if self.sender.is_none() {
            self.sender = M::fetch_sender();
        }

        if self.sender.is_none() {
            return;
        }

        let elapsed_since_start_ns = measurement.elapsed_since_start_ns();
        let is_boundary = measurement.is_flush_boundary();
        self.measurements.push(measurement);

        let should_flush = is_boundary
            || self.measurements.len() >= BATCH_SIZE
            || elapsed_since_start_ns.saturating_sub(self.last_flush_elapsed_ns)
                >= FLUSH_INTERVAL_NS;

        if should_flush {
            self.flush();
        }
    }

    pub(crate) fn flush(&mut self) {
        if self.measurements.is_empty() {
            return;
        }

        let sender = self.sender.as_ref().expect("Sender must exist");
        if let Some(last) = self.measurements.last() {
            self.last_flush_elapsed_ns = last.elapsed_since_start_ns();
        }
        let batch = std::mem::replace(&mut self.measurements, Vec::with_capacity(BATCH_SIZE));
        let _ = sender.send(batch);
    }
}

impl<M: BatchedMeasurement> Drop for MeasurementBatch<M> {
    fn drop(&mut self) {
        self.flush();
    }
}

/// Registry of every live per-thread [`MeasurementBatch`] for a given event type.
///
/// Producing threads keep their batch in thread-local storage for a lock-light
/// hot path, but those batches are unreachable from other threads. On shutdown
/// the producing threads may still be alive (e.g. parked async runtime workers)
/// with buffered events that have not reached the worker. The registry holds a
/// `Weak` to each batch so [`BatchRegistry::flush_all`] can drain them all from
/// the shutting-down thread before the worker stops.
pub(crate) struct BatchRegistry<M: BatchedMeasurement> {
    batches: Mutex<Vec<Weak<Mutex<MeasurementBatch<M>>>>>,
}

impl<M: BatchedMeasurement> BatchRegistry<M> {
    pub(crate) const fn new() -> Self {
        Self {
            batches: Mutex::new(Vec::new()),
        }
    }

    fn register(&self, batch: &Arc<Mutex<MeasurementBatch<M>>>) {
        if let Ok(mut batches) = self.batches.lock() {
            batches.push(Arc::downgrade(batch));
        }
    }

    pub(crate) fn flush_all(&self) {
        if let Ok(mut batches) = self.batches.lock() {
            batches.retain(|weak| match weak.upgrade() {
                Some(batch) => {
                    if let Ok(mut batch) = batch.lock() {
                        batch.flush();
                    }
                    true
                }
                None => false,
            });
        }
    }
}

/// Creates a fresh per-thread batch and registers it for shutdown draining.
pub(crate) fn register_thread_batch<M: BatchedMeasurement>(
    registry: &'static BatchRegistry<M>,
) -> Arc<Mutex<MeasurementBatch<M>>> {
    let batch = Arc::new(Mutex::new(MeasurementBatch::new()));
    registry.register(&batch);
    batch
}