#![doc = include_str!("../README.md")]
mod affinity;
mod aggregator;
pub mod config;
mod error;
mod exporter;
use crate::aggregator::MetricsAggregator;
use crate::config::MetricsConfig;
use metricus::{Id, Metrics, PreAllocatedMetric, Tag, Tags, set_metrics};
#[cfg(feature = "rtrb")]
use rtrb::Producer;
#[cfg(not(feature = "rtrb"))]
use std::sync::mpsc::SyncSender;
pub use error::{Error, Result};
use std::collections::HashMap;
type OwnedTag = (String, String);
type OwnedTags = Vec<OwnedTag>;
trait ToOwnedTags {
fn to_owned_tags(self) -> OwnedTags;
}
impl ToOwnedTags for Tags<'_> {
fn to_owned_tags(self) -> OwnedTags {
self.iter().map(|tag| tag.to_owned_tag()).collect()
}
}
trait ToOwnedTag {
fn to_owned_tag(self) -> OwnedTag;
}
impl ToOwnedTag for Tag<'_> {
fn to_owned_tag(self) -> OwnedTag {
(self.0.to_owned(), self.1.to_owned())
}
}
pub struct MetricsAgent {
#[cfg(feature = "rtrb")]
tx_upd: Producer<UpdateEvent>,
#[cfg(feature = "rtrb")]
tx_cnc: Producer<ControlEvent>,
#[cfg(not(feature = "rtrb"))]
tx_upd: SyncSender<UpdateEvent>,
#[cfg(not(feature = "rtrb"))]
tx_cnc: SyncSender<ControlEvent>,
default_tags: OwnedTags,
next_id: Id,
metric_key_to_id: HashMap<MetricKey, Id>,
}
impl MetricsAgent {
pub fn init() -> Result<()> {
Self::init_with_config(MetricsConfig::default())
}
pub fn init_with_config(config: MetricsConfig) -> Result<()> {
#[cfg(feature = "rtrb")]
let (tx_upd, rx_upd) = rtrb::RingBuffer::new(config.event_channel_size);
#[cfg(feature = "rtrb")]
let (tx_cnc, rx_cnc) = rtrb::RingBuffer::new(1024);
#[cfg(not(feature = "rtrb"))]
let (tx_upd, rx_upd) = std::sync::mpsc::sync_channel(config.event_channel_size);
#[cfg(not(feature = "rtrb"))]
let (tx_cnc, rx_cnc) = std::sync::mpsc::sync_channel(1024);
let _ = MetricsAggregator::start_on_thread(rx_upd, rx_cnc, config.clone());
let mut agent = MetricsAgent::new(tx_upd, tx_cnc, config.default_tags);
for metric in config.pre_allocated_metrics {
agent.register_metric_with_id(metric);
}
set_metrics(agent);
Ok(())
}
#[cfg(feature = "rtrb")]
fn new(tx_upd: Producer<UpdateEvent>, tx_cnc: Producer<ControlEvent>, default_tags: OwnedTags) -> Self {
Self {
tx_upd,
tx_cnc,
default_tags,
next_id: 0,
metric_key_to_id: Default::default(),
}
}
#[cfg(not(feature = "rtrb"))]
fn new(tx_upd: SyncSender<UpdateEvent>, tx_cnc: SyncSender<ControlEvent>, default_tags: OwnedTags) -> Self {
Self {
tx_upd,
tx_cnc,
default_tags,
next_id: 0,
metric_key_to_id: Default::default(),
}
}
#[inline]
fn assign_next_id(&mut self, name: &str, tags: OwnedTags) -> Id {
*self
.metric_key_to_id
.entry(MetricKey::new(name, tags))
.or_insert_with(|| {
let id = self.next_id;
self.next_id += 1;
id
})
}
#[inline]
fn send_control_event(&mut self, event: ControlEvent) {
#[cfg(feature = "rtrb")]
let _ = self.tx_cnc.push(event);
#[cfg(not(feature = "rtrb"))]
let _ = self.tx_cnc.try_send(event);
}
#[inline]
fn send_update_event(&mut self, event: UpdateEvent) {
#[cfg(feature = "rtrb")]
let _ = self.tx_upd.push(event);
#[cfg(not(feature = "rtrb"))]
let _ = self.tx_upd.try_send(event);
}
fn enrich_with_counter_tags(&self, tags: &mut OwnedTags) {
tags.push(("type", "counter").to_owned_tag());
tags.extend(self.default_tags.clone());
tags.sort();
tags.dedup();
}
fn enrich_with_histogram_tags(&self, tags: &mut OwnedTags) {
tags.push(("type", "histogram").to_owned_tag());
tags.extend(self.default_tags.clone());
tags.sort();
tags.dedup();
}
fn register_metric_with_id(&mut self, metric: PreAllocatedMetric) {
match metric {
PreAllocatedMetric::Counter { name, id, mut tags } => {
self.enrich_with_counter_tags(&mut tags);
self.send_control_event(ControlEvent::CounterCreate(id, name, tags))
}
PreAllocatedMetric::Histogram { name, id, mut tags } => {
self.enrich_with_histogram_tags(&mut tags);
self.send_control_event(ControlEvent::HistogramCreate(id, name, tags))
}
}
}
}
impl Metrics for MetricsAgent {
fn name(&self) -> &'static str {
"metrics-agent"
}
fn new_counter(&mut self, name: &str, tags: Tags) -> Id {
let mut tags = tags.to_owned_tags();
self.enrich_with_counter_tags(&mut tags);
let id = self.assign_next_id(name, tags.clone());
self.send_control_event(ControlEvent::CounterCreate(id, name.to_owned(), tags));
id
}
fn delete_counter(&mut self, id: Id) {
self.send_control_event(ControlEvent::CounterDelete(id));
}
#[inline]
fn increment_counter_by(&mut self, id: Id, delta: u64) {
self.send_update_event(UpdateEvent::CounterIncrement(id, delta));
}
fn new_histogram(&mut self, name: &str, tags: Tags) -> Id {
let mut tags = tags.to_owned_tags();
self.enrich_with_histogram_tags(&mut tags);
let id = self.assign_next_id(name, tags.clone());
self.send_control_event(ControlEvent::HistogramCreate(id, name.to_owned(), tags));
id
}
fn delete_histogram(&mut self, id: Id) {
self.send_control_event(ControlEvent::HistogramDelete(id));
}
#[inline]
fn record(&mut self, id: Id, value: u64) {
self.send_update_event(UpdateEvent::HistogramRecord(id, value));
}
}
#[derive(Debug)]
enum ControlEvent {
CounterCreate(Id, String, OwnedTags),
CounterDelete(Id),
HistogramCreate(Id, String, OwnedTags),
HistogramDelete(Id),
}
#[derive(Debug)]
enum UpdateEvent {
CounterIncrement(Id, u64),
HistogramRecord(Id, u64),
}
#[derive(Eq, PartialEq, Hash, Clone)]
struct MetricKey {
name: String,
tags: OwnedTags,
}
impl MetricKey {
fn new(name: &str, tags: OwnedTags) -> Self {
Self {
name: name.to_owned(),
tags,
}
}
}