mod types;
use crate::types::{Request, Sample as ProtoSample, Timeseries};
use chrono::Utc;
use reqwest::header::{
CONTENT_ENCODING, CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue,
USER_AGENT,
};
use std::collections::BTreeMap;
use std::time::Duration;
use tokio::sync::mpsc;
pub const DEFAULT_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5,
10.0,
];
struct MetricEvent {
name: String,
labels: Vec<(String, String)>,
value: f64,
timestamp: i64,
}
#[derive(Clone)]
pub struct Metric {
tx: mpsc::Sender<MetricEvent>,
}
impl Metric {
pub fn new(url: impl Into<String>, buffer_size: usize) -> Self {
let default_client = Self::default_http_client();
Self::with_client_and_headers(
url,
buffer_size,
default_client,
HeaderMap::new(),
)
}
pub fn with_client(
url: impl Into<String>,
buffer_size: usize,
custom_http: reqwest::Client,
) -> Self {
Self::with_client_and_headers(
url,
buffer_size,
custom_http,
HeaderMap::new(),
)
}
pub fn with_headers(
url: impl Into<String>,
buffer_size: usize,
extra_headers: HeaderMap,
) -> Self {
let default_client = Self::default_http_client();
Self::with_client_and_headers(
url,
buffer_size,
default_client,
extra_headers,
)
}
pub fn with_client_and_headers(
url: impl Into<String>,
buffer_size: usize,
custom_http: reqwest::Client,
extra_headers: HeaderMap,
) -> Self {
let (tx, rx) = mpsc::channel(buffer_size);
let url_str = url.into();
tokio::spawn(async move {
Self::background_worker(url_str, rx, custom_http, extra_headers)
.await;
});
Self { tx }
}
fn default_http_client() -> reqwest::Client {
reqwest::Client::builder()
.pool_max_idle_per_host(10)
.tcp_keepalive(Duration::from_secs(90))
.build()
.unwrap_or_default()
}
pub fn name(&self, name: impl Into<String>) -> MetricBuilder {
MetricBuilder {
manager: self.clone(),
name: name.into(),
labels: Vec::new(),
}
}
async fn background_worker(
url: String,
mut rx: mpsc::Receiver<MetricEvent>,
client: reqwest::Client,
extra_headers: HeaderMap,
) {
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_static(
"application/x-protobuf;proto=io.prometheus.write.v2.Request",
),
);
headers.insert(CONTENT_ENCODING, HeaderValue::from_static("snappy"));
headers.insert(
HeaderName::from_static("x-prometheus-remote-write-version"),
HeaderValue::from_static("2.0.0"),
);
headers.insert(USER_AGENT, HeaderValue::from_static("promwrite/0.1.3"));
for (k, v) in extra_headers.iter() {
headers.insert(k.clone(), v.clone());
}
let mut buffer = Vec::with_capacity(2000);
let mut interval = tokio::time::interval(Duration::from_millis(50));
loop {
tokio::select! {
Some(event) = rx.recv() => {
buffer.push(event);
if buffer.len() >= 1000 {
Self::flush_batch(&client, &url, &headers, &mut buffer).await;
}
}
_ = interval.tick() => {
if !buffer.is_empty() {
Self::flush_batch(&client, &url, &headers, &mut buffer).await;
}
}
}
}
}
async fn flush_batch(
client: &reqwest::Client,
url: &str,
headers: &HeaderMap,
buffer: &mut Vec<MetricEvent>,
) {
let mut req = Request {
symbols: vec![],
timeseries: vec![],
};
let mut symbol_map: BTreeMap<String, u32> = BTreeMap::new();
let mut get_or_insert_symbol = |s: &str, req: &mut Request| -> u32 {
if let Some(&idx) = symbol_map.get(s) {
idx
} else {
let idx = req.symbols.len() as u32;
req.symbols.push(s.to_string());
symbol_map.insert(s.to_string(), idx);
idx
}
};
for event in buffer.drain(..) {
let mut labels_map = BTreeMap::new();
labels_map.insert("__name__".to_string(), event.name);
for (k, v) in event.labels {
labels_map.insert(k, v);
}
let mut labels_refs = Vec::with_capacity(labels_map.len() * 2);
for (k, v) in &labels_map {
labels_refs.push(get_or_insert_symbol(k, &mut req));
labels_refs.push(get_or_insert_symbol(v, &mut req));
}
req.timeseries.push(Timeseries {
labels_refs,
samples: vec![ProtoSample {
value: event.value,
timestamp: event.timestamp,
}],
});
}
let encoded = prost::Message::encode_to_vec(&req);
if let Ok(compressed) = snap::raw::Encoder::new().compress_vec(&encoded)
{
let mut request_builder =
client.post(url).headers(headers.clone()).body(compressed);
if let Ok(parsed_url) = reqwest::Url::parse(url) {
let username = parsed_url.username();
let password = parsed_url.password();
if !username.is_empty() {
request_builder =
request_builder.basic_auth(username, password);
}
}
match request_builder.send().await {
Ok(resp) => {
if !resp.status().is_success() {
tracing::error!(
status = resp.status().as_u16(),
"Remote-write response error"
);
}
}
Err(err) => {
tracing::error!(error = %err, "Remote-write network request failed");
}
}
}
}
}
#[derive(Clone)]
pub struct MetricBuilder {
pub manager: Metric,
pub name: String,
pub labels: Vec<(String, String)>,
}
impl MetricBuilder {
pub fn label(
mut self,
name: impl Into<String>,
value: impl Into<String>,
) -> Self {
self.labels.push((name.into(), value.into()));
self
}
pub fn set_with_ts(&self, value: f64, timestamp_ms: i64) {
let event = MetricEvent {
name: self.name.clone(),
labels: self.labels.clone(),
value,
timestamp: timestamp_ms,
};
if let Err(_) = self.manager.tx.try_send(event) {
tracing::warn!(metric = %self.name, "Buffer queue full, event dropped");
}
}
pub fn set(&self, value: f64) {
self.set_with_ts(value, Utc::now().timestamp_millis());
}
pub fn inc(&self) {
self.set(1.0);
}
pub fn add(&self, value: f64) {
if value > 0.0 {
self.set(value);
}
}
}
pub struct Histogram {
sum_builder: MetricBuilder,
count_builder: MetricBuilder,
bucket_builders: Vec<(f64, MetricBuilder)>,
inf_builder: MetricBuilder,
}
impl Histogram {
pub fn new_with_default(builder: MetricBuilder) -> Self {
Self::new(builder, DEFAULT_BUCKETS.to_vec())
}
pub fn new(builder: MetricBuilder, buckets: Vec<f64>) -> Self {
let mut sorted_buckets = buckets;
sorted_buckets.sort_by(|a, b| a.partial_cmp(b).unwrap());
let mut sum_builder =
builder.manager.name(format!("{}_sum", builder.name));
for (k, v) in &builder.labels {
sum_builder = sum_builder.label(k, v);
}
let mut count_builder =
builder.manager.name(format!("{}_count", builder.name));
for (k, v) in &builder.labels {
count_builder = count_builder.label(k, v);
}
let mut bucket_builders = Vec::with_capacity(sorted_buckets.len());
for &bucket in &sorted_buckets {
let b_builder = builder.clone().label("le", bucket.to_string());
bucket_builders.push((bucket, b_builder));
}
let inf_builder = builder.clone().label("le", "+Inf");
Self {
sum_builder,
count_builder,
bucket_builders,
inf_builder,
}
}
pub fn observe_with_ts(&self, value: f64, timestamp_ms: i64) {
let _ = self.sum_builder.manager.tx.try_send(MetricEvent {
name: self.sum_builder.name.clone(),
labels: self.sum_builder.labels.clone(),
value,
timestamp: timestamp_ms,
});
let _ = self.count_builder.manager.tx.try_send(MetricEvent {
name: self.count_builder.name.clone(),
labels: self.count_builder.labels.clone(),
value: 1.0,
timestamp: timestamp_ms,
});
for (bucket, b_builder) in &self.bucket_builders {
if value <= *bucket {
let _ = b_builder.manager.tx.try_send(MetricEvent {
name: b_builder.name.clone(),
labels: b_builder.labels.clone(),
value: 1.0,
timestamp: timestamp_ms,
});
}
}
let _ = self.inf_builder.manager.tx.try_send(MetricEvent {
name: self.inf_builder.name.clone(),
labels: self.inf_builder.labels.clone(),
value: 1.0,
timestamp: timestamp_ms,
});
}
pub fn observe(&self, value: f64) {
self.observe_with_ts(value, Utc::now().timestamp_millis());
}
}