#![allow(dead_code)]
#[cfg(feature = "threads")]
use crossbeam_channel::{Receiver, Sender};
#[cfg(not(feature = "threads"))]
use std::{
cell::RefCell,
collections::VecDeque
};
use fxhash::FxHashMap;
pub use hdrhistogram as histogram;
use hdrhistogram::Histogram;
use parking_lot::Mutex;
use quanta::Clock;
use std::{
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
thread::JoinHandle,
};
lazy_static::lazy_static! {
static ref CHANNEL: Channel = Channel::new();
}
#[cfg(not(feature = "disable"))]
#[macro_export]
macro_rules! scope(
($span_name:literal) => (
let __INSTR_METRICS_SCOPE = $crate::Span::new($span_name);
)
);
#[cfg(feature = "disable")]
#[macro_export]
macro_rules! scope(
($span_name:literal) => (
)
);
pub enum Event {
SpanEnter(&'static str),
SpanExit {
span_name: &'static str,
elapsed: u64,
},
}
#[cfg(feature = "threads")]
struct Channel {
subscribers: AtomicU32,
channel: (Sender<Event>, Receiver<Event>),
clock: Clock,
}
#[cfg(feature = "threads")]
impl Channel {
fn new() -> Self {
Self {
subscribers: AtomicU32::new(0),
channel: crossbeam_channel::unbounded(),
clock: Clock::default(),
}
}
#[cfg(not(feature = "disable"))]
fn send(&self, event: Event) {
if self.subscribers.load(Ordering::SeqCst) > 0 {
self.channel.0.send(event).unwrap();
}
}
#[cfg(not(feature = "disable"))]
fn recv(&self) -> Option<Event> {
if self.subscribers.load(Ordering::SeqCst) > 0 {
if let Ok(event) = self.channel.1.try_recv() {
return Some(event);
}
}
None
}
#[cfg(feature = "disable")]
fn send(&self, event: Event) {}
#[cfg(feature = "disable")]
fn recv(&self) -> Option<Event> {
None
}
}
#[cfg(not(feature = "threads"))]
struct Channel {
queue: RefCell<VecDeque<Event>>,
clock: Clock,
}
#[cfg(not(feature = "threads"))]
impl Channel {
fn new() -> Self {
Self {
queue: RefCell::new(VecDeque::with_capacity(1024)),
clock: Clock::default(),
}
}
#[cfg(not(feature = "disable"))]
fn send(&self, event: Event) {
self.queue.borrow_mut().push_back(event);
}
#[cfg(not(feature = "disable"))]
fn recv(&self) -> Option<Event> {
self.queue.borrow_mut().pop_front()
}
#[cfg(feature = "disable")]
fn send(&self, event: Event) {}
#[cfg(feature = "disable")]
fn recv(&self) -> Option<Event> {
None
}
}
#[cfg(not(feature = "threads"))]
unsafe impl Send for Channel {}
#[cfg(not(feature = "threads"))]
unsafe impl Sync for Channel {}
pub struct Span {
name: &'static str,
start: u64,
}
impl Span {
pub fn new(name: &'static str) -> Self {
CHANNEL.send(Event::SpanEnter(name));
Self {
name,
start: CHANNEL.clock.now(),
}
}
}
impl Drop for Span {
fn drop(&mut self) {
let elapsed = CHANNEL.clock.now() - self.start;
CHANNEL.send(Event::SpanExit {
span_name: self.name,
elapsed,
});
}
}
pub struct Metrics {
#[cfg(feature = "threads")]
histograms: Arc<Mutex<FxHashMap<&'static str, Histogram<u64>>>>,
#[cfg(feature = "threads")]
worker_handle: Option<JoinHandle<()>>,
#[cfg(feature = "threads")]
worker_flag: Arc<AtomicBool>,
#[cfg(not(feature = "threads"))]
histograms: RefCell<FxHashMap<&'static str, Histogram<u64>>>,
sigfig: u8,
}
impl Metrics {
pub fn for_each_histogram<F>(&self, mut f: F)
where
F: FnMut(&'static str, &Histogram<u64>),
{
#[cfg(feature = "threads")]
{
self.histograms
.lock()
.iter()
.for_each(|(name, histogram)| (f)(name, histogram))
}
#[cfg(not(feature = "threads"))]
{
self.flush();
self.histograms.borrow()
.iter()
.for_each(|(name, histogram)| (f)(name, histogram))
}
}
#[cfg(feature = "threads")]
pub fn flush(&self) {
while !CHANNEL.channel.1.is_empty() {
std::thread::yield_now();
}
}
#[cfg(not(feature = "threads"))]
pub fn flush(&self) {
while let Some(event) = CHANNEL.recv() {
if let Event::SpanExit { span_name, elapsed } = event {
self.histograms
.borrow_mut()
.entry(span_name)
.or_insert_with(
|| Histogram::new_with_bounds(1, 1_000_000_000, self.sigfig).unwrap(),
)
.record(elapsed);
}
}
}
#[allow(unused_must_use)]
#[cfg(feature = "threads")]
pub fn new(sigfig: u8) -> Metrics {
let worker_flag = Arc::new(AtomicBool::new(true));
let histograms = Arc::new(Mutex::new(FxHashMap::default()));
let inner_histograms = histograms.clone();
let inner_flag = worker_flag.clone();
let worker_handle = std::thread::spawn(move || {
while inner_flag.load(Ordering::Relaxed) {
while let Some(event) = CHANNEL.recv() {
if let Event::SpanExit { span_name, elapsed } = event {
inner_histograms
.lock()
.entry(span_name)
.or_insert_with(
|| Histogram::new_with_bounds(1, 1_000_000_000, sigfig).unwrap(),
)
.record(elapsed);
}
}
}
});
CHANNEL.subscribers.fetch_add(1, Ordering::SeqCst);
Self {
histograms,
worker_flag,
sigfig,
worker_handle: Some(worker_handle),
}
}
#[cfg(not(feature = "threads"))]
pub fn new(sigfig: u8) -> Metrics {
Self {
histograms: RefCell::new(FxHashMap::default()),
sigfig
}
}
}
impl Drop for Metrics {
fn drop(&mut self) {
#[cfg(feature = "threads")]
{
CHANNEL.subscribers.fetch_sub(1, Ordering::SeqCst);
self.worker_flag.store(false, Ordering::Relaxed);
self.worker_handle.take().unwrap().join().unwrap();
}
}
}