use std::{borrow::BorrowMut, sync::Arc};
use std::marker::PhantomData;
use std::time::Duration;
use log::{error, info};
use rand::Rng;
use redis::AsyncCommands;
use serde::Serialize;
use parking_lot::Mutex;
use serde_json::json;
use crate::{retry_call, ErrorTypes, RedisObjects};
pub trait MetricMessage: Serialize + Default + Send + Sync + 'static {}
impl<T: Serialize + Default + Send + Sync + 'static> MetricMessage for T {}
pub struct AutoExportingMetricsBuilder<Message: MetricMessage> {
channel_name: String,
counter_name: Option<String>,
counter_type: String,
host: String,
store: Arc<RedisObjects>,
data_type: PhantomData<Message>,
export_zero: bool,
export_interval: Duration,
export_notify: tokio::sync::Notify,
}
impl<Message: MetricMessage> AutoExportingMetricsBuilder<Message> {
pub (crate) fn new(store: Arc<RedisObjects>, channel_name: String, counter_type: String) -> Self {
Self {
channel_name,
counter_name: None,
counter_type,
host: format!("{:x}", rand::rng().random::<u128>()),
store,
export_zero: true,
export_interval: Duration::from_secs(5),
data_type: Default::default(),
export_notify: tokio::sync::Notify::new(),
}
}
pub fn counter_name(mut self, value: String) -> Self {
self.counter_name = Some(value); self
}
pub fn host(mut self, value: String) -> Self {
self.host = value; self
}
pub fn export_interval(mut self, value: Duration) -> Self {
self.export_interval = value; self
}
pub fn export_zero(mut self, value: bool) -> Self {
self.export_zero = value; self
}
pub fn start(self) -> AutoExportingMetrics<Message> {
let current = Arc::new(Mutex::new(Message::default()));
let metrics = AutoExportingMetrics{
config: Arc::new(self),
current,
};
metrics.clone().exporter();
metrics
}
}
#[macro_export]
macro_rules! increment {
($counter:expr, $field:ident) => {
increment!($counter, $field, 1)
};
($counter:expr, $field:ident, $value:expr) => {
$counter.lock().$field += $value
};
(timer, $counter:expr, $field:ident) => {
increment!(timer, $counter, $field, 0.0)
};
(timer, $counter:expr, $field:ident, $value:expr) => {
$counter.lock().$field.increment($value)
};
}
pub use increment;
pub struct AutoExportingMetrics<Message: MetricMessage> {
config: Arc<AutoExportingMetricsBuilder<Message>>,
current: Arc<Mutex<Message>>
}
impl<Message: MetricMessage> Clone for AutoExportingMetrics<Message> {
fn clone(&self) -> Self {
Self { config: self.config.clone(), current: self.current.clone() }
}
}
impl<Message: MetricMessage> AutoExportingMetrics<Message> {
fn exporter(mut self) {
tokio::spawn(async move {
while let Err(err) = self.export_loop().await {
error!("Error in metrics exporter {}: {}", self.config.counter_type, err);
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
}
fn is_zero(&self, obj: &serde_json::Value) -> bool {
if let Some(number) = obj.as_i64() {
if number == 0 {
return true
}
}
if let Some(number) = obj.as_u64() {
if number == 0 {
return true
}
}
if let Some(number) = obj.as_f64() {
if number == 0.0 {
return true
}
}
false
}
fn is_all_zero(&self, obj: &serde_json::Value) -> bool {
if let Some(obj) = obj.as_object() {
for value in obj.values() {
if !self.is_zero(value) {
return false
}
}
true
} else {
false
}
}
async fn export_once(&mut self) -> Result<(), ErrorTypes> {
let outgoing = self.reset();
let mut outgoing = serde_json::to_value(&outgoing)?;
if self.config.export_zero || !self.is_all_zero(&outgoing) {
if let Some(obj) = outgoing.as_object_mut() {
obj.insert("type".to_owned(), json!(self.config.counter_type));
obj.insert("name".to_owned(), json!(self.config.counter_name));
obj.insert("host".to_owned(), json!(self.config.host));
}
let data = serde_json::to_string(&outgoing)?;
let _recievers: u32 = retry_call!(self.config.store.pool, publish, self.config.channel_name.as_str(), data.as_str())?;
}
Ok(())
}
async fn export_loop(&mut self) -> Result<(), ErrorTypes> {
loop {
let _ = tokio::time::timeout(self.config.export_interval, self.config.export_notify.notified()).await;
self.export_once().await?;
if Arc::strong_count(&self.current) == 1 {
info!("Stopping metrics exporter: {}", self.config.channel_name);
self.export_once().await?; return Ok(())
}
}
}
pub fn lock(&self) -> parking_lot::MutexGuard<Message> {
self.current.lock()
}
pub fn reset(&self) -> Message {
let mut message: Message = Default::default();
std::mem::swap(&mut message, self.current.lock().borrow_mut());
message
}
pub fn export(&self) {
self.config.export_notify.notify_one()
}
}
impl<M: MetricMessage> Drop for AutoExportingMetrics<M> {
fn drop(&mut self) {
if Arc::strong_count(&self.current) <= 2 {
self.export()
}
}
}
#[cfg(test)]
fn init() {
let _ = env_logger::builder().filter_level(log::LevelFilter::Debug).is_test(true).try_init();
}
#[tokio::test]
async fn auto_exporting_counter() {
use log::info;
init();
use serde::Deserialize;
use crate::test::redis_connection;
let connection = redis_connection().await;
info!("redis connected");
#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
struct MetricKind {
started: u64,
finished: u64,
}
let mut subscribe = connection.subscribe_json::<MetricKind>("test_metrics_channel".to_owned()).await;
{
info!("Fast export");
let counter = connection.auto_exporting_metrics::<MetricKind>("test_metrics_channel".to_owned(), "component-x".to_owned())
.export_interval(Duration::from_micros(10))
.export_zero(false)
.start();
increment!(counter, started, 5);
info!("Waiting for export");
assert_eq!(subscribe.recv().await.unwrap().unwrap(), MetricKind{started: 5, finished: 0});
}
{
info!("slow export");
let counter = connection.auto_exporting_metrics::<MetricKind>("test_metrics_channel".to_owned(), "component-x".to_owned())
.export_interval(Duration::from_secs(1000))
.export_zero(false)
.start();
increment!(counter, started);
increment!(counter, finished);
counter.reset();
increment!(counter, started);
increment!(counter, started);
increment!(counter, finished);
counter.export();
assert_eq!(subscribe.recv().await.unwrap().unwrap(), MetricKind{started: 2, finished: 1});
increment!(counter, finished, 5);
increment!(counter, finished);
}
let result = tokio::time::timeout(Duration::from_secs(10), subscribe.recv()).await.unwrap();
assert_eq!(result.unwrap().unwrap(), MetricKind{started: 0, finished: 6});
}