use chrono::{DateTime, Utc};
use crossbeam_channel::{Receiver, Sender};
use std::cell::RefCell;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
use crate::core::Value;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;
thread_local! {
pub static IS_METRICS_THREAD: RefCell<bool> = const { RefCell::new(false) };
}
#[derive(Debug, Clone)]
pub struct MetricEvent {
pub name: String,
pub description: Option<String>,
pub unit: Option<String>,
pub metric_type: String,
pub value: f64,
pub attributes: Vec<(String, String)>, pub timestamp: DateTime<Utc>,
}
pub struct SystemMetricsLayer {
sender: Sender<MetricEvent>,
}
impl SystemMetricsLayer {
pub fn new(sender: Sender<MetricEvent>) -> Self {
Self { sender }
}
}
impl<S> Layer<S> for SystemMetricsLayer
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
{
fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
if IS_METRICS_THREAD.with(|f| *f.borrow()) {
return;
}
let mut visitor = MetricVisitor::default();
event.record(&mut visitor);
if let (Some(name), Some(m_type), Some(val)) = (
visitor.metric_name.clone(),
visitor.metric_type.clone(),
visitor.value,
) {
if let Some(span) = _ctx.lookup_current() {
let ext = span.extensions();
if let Some(data) = ext.get::<(
std::time::Instant,
chrono::DateTime<Utc>,
String,
String,
Vec<(String, String)>,
String,
String,
)>() {
visitor
.attributes
.push(("trace_id".to_string(), data.5.clone()));
visitor
.attributes
.push(("span_id".to_string(), data.6.clone()));
}
}
let attributes = visitor.attributes;
let metric_event = MetricEvent {
name,
description: visitor.description,
unit: visitor.unit,
metric_type: m_type,
value: val,
attributes,
timestamp: Utc::now(),
};
let _ = self.sender.try_send(metric_event);
}
}
}
#[derive(Default)]
struct MetricVisitor {
metric_name: Option<String>,
metric_type: Option<String>,
value: Option<f64>,
unit: Option<String>,
description: Option<String>,
attributes: Vec<(String, String)>,
}
impl tracing::field::Visit for MetricVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
let key = field.name();
let val = format!("{:?}", value);
let val_str = if val.starts_with('"') && val.ends_with('"') {
val[1..val.len() - 1].to_string()
} else {
val
};
match key {
"metric_name" => self.metric_name = Some(val_str),
"metric_type" => self.metric_type = Some(val_str),
"unit" => self.unit = Some(val_str),
"description" => self.description = Some(val_str),
_ => {
if key != "message" && key != "target" {
self.attributes.push((key.to_string(), val_str));
}
}
}
}
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if field.name() == "value" {
self.value = Some(value);
} else {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
}
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
if field.name() == "value" {
self.value = Some(value as f64);
} else {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
}
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
if field.name() == "value" {
self.value = Some(value as f64);
} else {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
}
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
match field.name() {
"metric_name" => self.metric_name = Some(value.to_string()),
"metric_type" => self.metric_type = Some(value.to_string()),
"unit" => self.unit = Some(value.to_string()),
"description" => self.description = Some(value.to_string()),
_ => {
if field.name() != "message" && field.name() != "target" {
self.attributes
.push((field.name().to_string(), value.to_string()));
}
}
}
}
}
pub fn start_metrics_flusher(
engine: Arc<MVCCEngine>,
receiver: Receiver<MetricEvent>,
) -> (
Arc<std::sync::atomic::AtomicBool>,
std::thread::JoinHandle<()>,
) {
let shutdown_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
let flag_clone = Arc::clone(&shutdown_flag);
let handle = thread::Builder::new()
.name("oxibase-metrics-flusher".to_string())
.spawn(move || {
IS_METRICS_THREAD.with(|f| *f.borrow_mut() = true);
crate::common::tracing::IS_TELEMETRY_THREAD.with(|f| *f.borrow_mut() = true);
crate::common::logging::IS_LOG_FLUSHER.with(|f| *f.borrow_mut() = true);
let batch_size = 100;
let timeout = Duration::from_secs(1);
let mut last_pool_check = std::time::Instant::now();
let mut last_small_stats = crate::common::buffer_pool::global::small().stats();
let mut last_medium_stats = crate::common::buffer_pool::global::medium().stats();
let mut last_large_stats = crate::common::buffer_pool::global::large().stats();
loop {
if flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
let mut entries = Vec::new();
if let Ok(entry) = receiver.recv_timeout(timeout) {
entries.push(entry);
while entries.len() < batch_size {
match receiver.try_recv() {
Ok(entry) => entries.push(entry),
Err(_) => break,
}
}
}
if last_pool_check.elapsed() >= Duration::from_secs(1) {
last_pool_check = std::time::Instant::now();
let current_small = crate::common::buffer_pool::global::small().stats();
let current_medium = crate::common::buffer_pool::global::medium().stats();
let current_large = crate::common::buffer_pool::global::large().stats();
let pools = [
("small", ¤t_small, &mut last_small_stats),
("medium", ¤t_medium, &mut last_medium_stats),
("large", ¤t_large, &mut last_large_stats),
];
for (pool_name, current_stats, last_stats) in pools {
let gets_delta =
current_stats.get_count.saturating_sub(last_stats.get_count);
let created_delta = current_stats
.buffers_created
.saturating_sub(last_stats.buffers_created);
let misses = created_delta;
let hits = gets_delta.saturating_sub(created_delta);
if hits > 0 {
entries.push(MetricEvent {
name: "buffer_pool_hits".to_string(),
description: Some(format!("Hits for {} buffer pool", pool_name)),
unit: Some("count".to_string()),
metric_type: "counter".to_string(),
value: hits as f64,
attributes: vec![("pool".to_string(), pool_name.to_string())],
timestamp: Utc::now(),
});
}
if misses > 0 {
entries.push(MetricEvent {
name: "buffer_pool_misses".to_string(),
description: Some(format!("Misses for {} buffer pool", pool_name)),
unit: Some("count".to_string()),
metric_type: "counter".to_string(),
value: misses as f64,
attributes: vec![("pool".to_string(), pool_name.to_string())],
timestamp: Utc::now(),
});
}
*last_stats = current_stats.clone();
}
}
if entries.is_empty() {
continue;
}
if let Err(e) = insert_metric_batch(&engine, &entries) {
eprintln!("Failed to flush internal metrics: {}", e);
}
}
})
.expect("Failed to spawn metrics flusher thread");
(shutdown_flag, handle)
}
fn insert_metric_batch(engine: &MVCCEngine, entries: &[MetricEvent]) -> crate::core::Result<()> {
let mut tx = engine.begin_transaction()?;
let mut table = match tx.get_table("system.metrics") {
Ok(t) => t,
Err(_) => {
tx.rollback()?;
return Ok(());
}
};
for entry in entries {
let name_val = Value::Text(entry.name.clone().into());
let desc_val = entry
.description
.clone()
.map_or(Value::Null(crate::core::DataType::Text), |d| {
Value::Text(d.into())
});
let unit_val = entry
.unit
.clone()
.map_or(Value::Null(crate::core::DataType::Text), |u| {
Value::Text(u.into())
});
let type_val = Value::Text(entry.metric_type.clone().into());
let val_val = Value::Float(entry.value);
let attr_val = Value::Text({
let mut map = serde_json::Map::new();
for (k, v) in &entry.attributes {
map.insert(k.clone(), serde_json::Value::String(v.clone()));
}
serde_json::to_string(&map)
.unwrap_or_else(|_| "{}".to_string())
.into()
});
let ts_val = Value::Timestamp(entry.timestamp);
let row = vec![
Value::null_unknown(), name_val, desc_val, unit_val, type_val, val_val, attr_val, ts_val, ];
table.insert(row.into())?;
}
tx.commit()?;
Ok(())
}