use crate::affinity::Affinity;
use crate::config::MetricsConfig;
use crate::exporter::Exporter;
use crate::{ControlEvent, Error, OwnedTags, UpdateEvent};
use log::error;
use metricus::Id;
#[cfg(feature = "rtrb")]
use rtrb::Consumer;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Write;
#[cfg(not(feature = "rtrb"))]
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub type Counters = HashMap<Id, Counter>;
pub type Histograms = HashMap<Id, Histogram>;
pub struct MetricsAggregator {
#[cfg(feature = "rtrb")]
rx_upd: Consumer<UpdateEvent>,
#[cfg(feature = "rtrb")]
rx_cnc: Consumer<ControlEvent>,
#[cfg(not(feature = "rtrb"))]
rx_upd: Receiver<UpdateEvent>,
#[cfg(not(feature = "rtrb"))]
rx_cnc: Receiver<ControlEvent>,
exporter: Exporter,
counters: Counters,
histograms: Histograms,
next_flush_time_ns: u64,
flush_interval_ns: u64,
}
impl MetricsAggregator {
pub fn new(
#[cfg(feature = "rtrb")] rx_upd: Consumer<UpdateEvent>,
#[cfg(feature = "rtrb")] rx_cnc: Consumer<ControlEvent>,
#[cfg(not(feature = "rtrb"))] rx_upd: Receiver<UpdateEvent>,
#[cfg(not(feature = "rtrb"))] rx_cnc: Receiver<ControlEvent>,
exporter: Exporter,
flush_interval: Duration,
) -> Self {
Self {
rx_upd,
rx_cnc,
exporter,
counters: Default::default(),
histograms: Default::default(),
flush_interval_ns: flush_interval.as_nanos() as u64,
next_flush_time_ns: current_time_ns() + flush_interval.as_nanos() as u64,
}
}
pub fn start_on_thread(
#[cfg(feature = "rtrb")] rx_upd: Consumer<UpdateEvent>,
#[cfg(feature = "rtrb")] rx_cnc: Consumer<ControlEvent>,
#[cfg(not(feature = "rtrb"))] rx_upd: Receiver<UpdateEvent>,
#[cfg(not(feature = "rtrb"))] rx_cnc: Receiver<ControlEvent>,
config: MetricsConfig,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("aggregator".to_string())
.spawn(move || {
let affinity = Affinity::try_from(config.clone()).unwrap();
affinity.pin_current_thread_to_core();
let exporter = config
.exporter
.try_into()
.inspect_err(|e| error!("unable to create exporter: {e}"))
.unwrap();
let mut aggregator = MetricsAggregator::new(rx_upd, rx_cnc, exporter, config.flush_interval);
loop {
aggregator
.poll()
.inspect_err(|e| error!("error when polling aggregator: {e}"))
.unwrap();
std::thread::sleep(Duration::from_millis(1));
}
})
.unwrap()
}
#[inline]
fn poll(&mut self) -> crate::Result<()> {
self.process_events()?;
let now = current_time_ns();
if now > self.next_flush_time_ns {
self.flush_metrics(now)?;
self.next_flush_time_ns = now + self.flush_interval_ns;
}
Ok(())
}
#[cfg(feature = "rtrb")]
#[inline]
fn process_events(&mut self) -> crate::Result<()> {
if let Ok(chunk) = self.rx_cnc.read_chunk(self.rx_cnc.slots()) {
for event in chunk {
Self::handle_control_event(&mut self.counters, &mut self.histograms, event)?;
}
}
if let Ok(chunk) = self.rx_upd.read_chunk(self.rx_upd.slots()) {
for event in chunk {
Self::handle_update_event(&mut self.counters, &mut self.histograms, event)?;
}
}
Ok(())
}
#[cfg(not(feature = "rtrb"))]
#[inline]
fn process_events(&mut self) -> crate::Result<()> {
for event in self.rx_cnc.try_iter() {
Self::handle_control_event(&mut self.counters, &mut self.histograms, event)?;
}
for event in self.rx_upd.try_iter() {
Self::handle_update_event(&mut self.counters, &mut self.histograms, event)?;
}
Ok(())
}
#[inline]
fn handle_control_event(
counters: &mut Counters,
histograms: &mut Histograms,
event: ControlEvent,
) -> crate::Result<()> {
match event {
ControlEvent::CounterCreate(id, name, tags) => {
counters.entry(id).or_insert_with(|| Counter::new(name, tags));
}
ControlEvent::CounterDelete(id) => {
counters.remove(&id);
}
ControlEvent::HistogramCreate(id, name, tags) => {
histograms.entry(id).or_insert_with(|| Histogram::new(name, tags));
}
ControlEvent::HistogramDelete(id) => {
histograms.remove(&id);
}
}
Ok(())
}
#[inline]
fn handle_update_event(
counters: &mut Counters,
histograms: &mut Histograms,
event: UpdateEvent,
) -> crate::Result<()> {
match event {
UpdateEvent::CounterIncrement(id, delta) => {
if let Some(counter) = counters.get_mut(&id) {
counter.increment(delta);
}
}
UpdateEvent::HistogramRecord(id, value) => {
if let Some(histogram) = histograms.get_mut(&id) {
histogram.inner.record(value).map_err(Error::other)?;
}
}
}
Ok(())
}
#[inline]
fn flush_metrics(&mut self, timestamp: u64) -> crate::Result<()> {
self.exporter.publish_counters(&self.counters, timestamp)?;
self.exporter.publish_histograms(&self.histograms, timestamp)?;
self.histograms
.iter_mut()
.for_each(|(_, histogram)| histogram.inner.clear());
Ok(())
}
}
#[derive(Serialize)]
pub struct Counter {
value: u64,
#[serde(flatten)]
meta_data: MetaData,
}
impl Counter {
fn new(name: String, tags: OwnedTags) -> Self {
Self {
value: 0,
meta_data: MetaData::new(name, tags),
}
}
fn increment(&mut self, delta: u64) {
self.value += delta;
}
}
pub struct Histogram {
inner: hdrhistogram::Histogram<u64>,
meta_data: MetaData,
}
impl Histogram {
fn new(name: String, tags: OwnedTags) -> Self {
Self {
inner: hdrhistogram::Histogram::<u64>::new(3).unwrap(), meta_data: MetaData::new(name, tags),
}
}
}
#[derive(Serialize)]
struct MetaData {
name: String,
tags: OwnedTags,
}
impl MetaData {
fn new(name: String, tags: OwnedTags) -> Self {
Self { name, tags }
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "snake_case")]
pub enum Encoder {
LineProtocol,
Json,
}
impl Encoder {
pub fn encode_counter(&self, counter: &Counter, timestamp: u64, dst: &mut impl Write) -> std::io::Result<()> {
match self {
Encoder::LineProtocol => LineProtocol::encode_counter(counter, timestamp, dst),
Encoder::Json => Json::encode_counter(counter, timestamp, dst),
}
}
pub fn encode_histogram(&self, histogram: &Histogram, timestamp: u64, dst: &mut impl Write) -> std::io::Result<()> {
match self {
Encoder::LineProtocol => LineProtocol::encode_histogram(histogram, timestamp, dst),
Encoder::Json => Ok(()),
}
}
}
struct LineProtocol;
impl LineProtocol {
fn encode_counter(counter: &Counter, timestamp: u64, dst: &mut impl Write) -> std::io::Result<()> {
dst.write_all(counter.meta_data.name.as_bytes())?;
for tag in counter.meta_data.tags.iter() {
dst.write_all(b",")?;
dst.write_all(tag.0.as_bytes())?;
dst.write_all(b"=")?;
dst.write_all(tag.1.as_bytes())?;
}
dst.write_all(b" value=")?;
dst.write_all(itoa::Buffer::new().format(counter.value).as_bytes())?;
dst.write_all(b"u ")?;
dst.write_all(itoa::Buffer::new().format(timestamp).as_bytes())?;
dst.write_all(b"\n")?;
Ok(())
}
fn encode_histogram(histogram: &Histogram, timestamp: u64, dst: &mut impl Write) -> std::io::Result<()> {
dst.write_all(histogram.meta_data.name.as_bytes())?;
for tag in histogram.meta_data.tags.iter() {
dst.write_all(b",")?;
dst.write_all(tag.0.as_bytes())?;
dst.write_all(b"=")?;
dst.write_all(tag.1.as_bytes())?;
}
dst.write_all(b" count=")?;
dst.write_all(itoa::Buffer::new().format(histogram.inner.len()).as_bytes())?;
dst.write_all(b"u,min=")?;
dst.write_all(itoa::Buffer::new().format(histogram.inner.min()).as_bytes())?;
dst.write_all(b"u,max=")?;
dst.write_all(itoa::Buffer::new().format(histogram.inner.max()).as_bytes())?;
dst.write_all(b"u,mean=")?;
dst.write_all(dtoa::Buffer::new().format(histogram.inner.mean()).as_bytes())?;
dst.write_all(b",p50=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.50))
.as_bytes(),
)?;
dst.write_all(b"u,p75=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.75))
.as_bytes(),
)?;
dst.write_all(b"u,p90=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.90))
.as_bytes(),
)?;
dst.write_all(b"u,p95=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.95))
.as_bytes(),
)?;
dst.write_all(b"u,p99=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.99))
.as_bytes(),
)?;
dst.write_all(b"u,p999=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.999))
.as_bytes(),
)?;
dst.write_all(b"u,p9999=")?;
dst.write_all(
itoa::Buffer::new()
.format(histogram.inner.value_at_quantile(0.9999))
.as_bytes(),
)?;
dst.write_all(b"u ")?;
dst.write_all(itoa::Buffer::new().format(timestamp).as_bytes())?;
dst.write_all(b"\n")?;
Ok(())
}
}
struct Json;
impl Json {
fn encode_counter(counter: &Counter, timestamp: u64, dst: &mut impl Write) -> std::io::Result<()> {
serde_json::to_writer(&mut *dst, &CounterWithTimestamp::new(counter, timestamp))
.map_err(std::io::Error::other)
.and_then(|_| dst.write_all(b"\n"))
}
}
#[derive(Serialize)]
struct CounterWithTimestamp<'a> {
timestamp: u64,
#[serde(flatten)]
counter: &'a Counter,
}
impl<'a> CounterWithTimestamp<'a> {
fn new(counter: &'a Counter, timestamp: u64) -> Self {
Self { timestamp, counter }
}
}
fn current_time_ns() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64
}