use std::{
collections::BTreeMap,
fs,
path::{Path, PathBuf},
sync::Arc,
};
use dashmap::DashMap;
use parking_lot::RwLock;
use tracing::{debug, warn};
use crate::{
errors::{MetricsError, Result},
labels::Labels,
metric::{MetricType, PrometheusMetric},
};
type MetricKey = String;
pub struct MetricsWriter {
metrics: Arc<DashMap<MetricKey, PrometheusMetric>>,
metric_types: Arc<RwLock<BTreeMap<String, MetricType>>>,
output_path: Arc<PathBuf>,
filename: Arc<String>,
}
impl Clone for MetricsWriter {
fn clone(&self) -> Self {
Self {
metrics: Arc::clone(&self.metrics),
metric_types: Arc::clone(&self.metric_types),
output_path: Arc::clone(&self.output_path),
filename: Arc::clone(&self.filename),
}
}
}
impl MetricsWriter {
pub fn new() -> Result<Self> {
Self::with_path(Self::config_path()?)
}
pub fn with_path(path: impl AsRef<Path>) -> Result<Self> {
Self::with_path_and_filename(path, "metrics.prom")
}
pub fn with_path_and_filename(
path: impl AsRef<Path>,
filename: impl Into<String>,
) -> Result<Self> {
let path = path.as_ref();
fs::create_dir_all(path).map_err(|e| {
MetricsError::PathError(format!("Failed to create metrics directory: {}", e))
})?;
Ok(Self {
metrics: Arc::new(DashMap::new()),
metric_types: Arc::new(RwLock::new(BTreeMap::new())),
output_path: Arc::new(path.to_path_buf()),
filename: Arc::new(filename.into()),
})
}
pub fn output_path(&self) -> &Path {
&self.output_path
}
pub fn counter(
&self,
name: impl Into<String>,
labels: impl Into<Labels>,
delta: f64,
) -> Result<()> {
self.update_metric(name, labels, MetricType::Counter, delta, false)
}
pub fn gauge(
&self,
name: impl Into<String>,
labels: impl Into<Labels>,
value: f64,
) -> Result<()> {
self.update_metric(name, labels, MetricType::Gauge, value, true)
}
pub fn inc(&self, name: impl Into<String>, labels: impl Into<Labels>) -> Result<()> {
self.counter(name, labels, 1.0)
}
pub fn set(
&self,
name: impl Into<String>,
labels: impl Into<Labels>,
value: f64,
) -> Result<()> {
self.gauge(name, labels, value)
}
pub async fn flush(&self) -> Result<()> {
let snapshot = {
debug!("Taking metrics snapshot");
let mut metrics_by_name: BTreeMap<String, Vec<PrometheusMetric>> = BTreeMap::new();
for entry in self.metrics.iter() {
let metric = entry.value().clone();
metrics_by_name
.entry(metric.name.clone())
.or_insert_with(Vec::new)
.push(metric);
}
metrics_by_name
};
self.write_snapshot(snapshot).await
}
pub fn len(&self) -> usize {
self.metrics.len()
}
pub fn is_empty(&self) -> bool {
self.metrics.is_empty()
}
#[cfg(test)]
pub fn clear(&self) {
self.metrics.clear();
self.metric_types.write().clear();
}
fn update_metric(
&self,
name: impl Into<String>,
labels: impl Into<Labels>,
metric_type: MetricType,
value: f64,
is_set: bool,
) -> Result<()> {
let name_str = name.into();
let labels = labels.into();
Self::validate_name(&name_str)?;
Self::validate_value(value)?;
let key = Self::make_key(&name_str, &labels);
{
let mut types = self.metric_types.write();
types.entry(name_str.clone()).or_insert(metric_type);
}
self.metrics
.entry(key)
.and_modify(|m| {
if is_set {
m.value = value;
} else {
m.value += value;
}
})
.or_insert_with(|| {
PrometheusMetric::new(&name_str, metric_type, labels.clone(), value)
});
Ok(())
}
async fn write_snapshot(
&self,
snapshot: BTreeMap<String, Vec<PrometheusMetric>>,
) -> Result<()> {
let output_path = self.output_path.as_ref();
let mut content = String::new();
let types = self.metric_types.read().clone();
for (metric_name, metric_type) in types {
if let Some(metrics) = snapshot.get(&metric_name) {
if !metrics.is_empty() {
content.push_str(&format!("# TYPE {} {}\n", metric_name, metric_type));
let mut sorted_metrics = metrics.clone();
sorted_metrics.sort();
for metric in sorted_metrics {
if metric.is_valid() {
content.push_str(&metric.to_prometheus_line());
content.push('\n');
} else {
warn!("Skipping invalid metric: {}", metric.name);
}
}
}
}
}
let file_path = output_path.join(self.filename.as_str());
fs::write(&file_path, content).map_err(MetricsError::IoError)?;
debug!("Wrote metrics to {}", file_path.display());
Ok(())
}
fn config_path() -> Result<PathBuf> {
let path = std::env::var("METRICS_TEXTFILE_PATH")
.unwrap_or_else(|_| "/var/lib/node_exporter/textfile_collector/".to_string());
Ok(PathBuf::from(path))
}
fn validate_name(name: &str) -> Result<()> {
if name.is_empty() {
return Err(MetricsError::InvalidName(
"Metric name cannot be empty".to_string(),
));
}
if !name.chars().next().unwrap().is_alphabetic() && name.chars().next() != Some('_') {
return Err(MetricsError::InvalidName(format!(
"Metric name must start with letter or underscore: {}",
name
)));
}
for ch in name.chars() {
if !ch.is_alphanumeric() && ch != '_' && ch != ':' {
return Err(MetricsError::InvalidName(format!(
"Invalid character in metric name '{}': {}",
name, ch
)));
}
}
Ok(())
}
fn validate_value(value: f64) -> Result<()> {
if !value.is_finite() {
return Err(MetricsError::InvalidValue(format!(
"Metric value must be finite, got: {}",
value
)));
}
Ok(())
}
fn make_key(name: &str, labels: &Labels) -> MetricKey {
let labels_str = labels.to_string();
if labels_str.is_empty() {
name.to_string()
} else {
format!("{}:{}", name, labels_str)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_name_valid() {
assert!(MetricsWriter::validate_name("requests_total").is_ok());
assert!(MetricsWriter::validate_name("_internal_metric").is_ok());
assert!(MetricsWriter::validate_name("metric123").is_ok());
assert!(MetricsWriter::validate_name("metric:subsystem").is_ok());
}
#[test]
fn test_validate_name_invalid() {
assert!(MetricsWriter::validate_name("").is_err());
assert!(MetricsWriter::validate_name("123invalid").is_err());
assert!(MetricsWriter::validate_name("invalid-name").is_err());
}
#[test]
fn test_validate_value_valid() {
assert!(MetricsWriter::validate_value(0.0).is_ok());
assert!(MetricsWriter::validate_value(42.5).is_ok());
assert!(MetricsWriter::validate_value(-100.0).is_ok());
}
#[test]
fn test_validate_value_invalid() {
assert!(MetricsWriter::validate_value(f64::NAN).is_err());
assert!(MetricsWriter::validate_value(f64::INFINITY).is_err());
assert!(MetricsWriter::validate_value(f64::NEG_INFINITY).is_err());
}
#[tokio::test]
async fn test_writer_creation() {
let writer = MetricsWriter::new();
assert!(writer.is_ok());
}
#[tokio::test]
async fn test_counter_update() -> Result<()> {
let writer = MetricsWriter::with_path("/tmp/metrics_test")?;
writer.clear();
writer.counter("test_counter", Vec::<(String, String)>::new(), 5.0)?;
assert_eq!(writer.len(), 1);
writer.counter("test_counter", Vec::<(String, String)>::new(), 3.0)?;
assert_eq!(writer.len(), 1);
Ok(())
}
#[tokio::test]
async fn test_gauge_update() -> Result<()> {
let writer = MetricsWriter::with_path("/tmp/metrics_test")?;
writer.clear();
writer.gauge("test_gauge", Vec::<(String, String)>::new(), 42.0)?;
assert_eq!(writer.len(), 1);
writer.gauge("test_gauge", Vec::<(String, String)>::new(), 99.0)?;
assert_eq!(writer.len(), 1);
Ok(())
}
#[tokio::test]
async fn test_multiple_labels() -> Result<()> {
let writer = MetricsWriter::with_path("/tmp/metrics_test")?;
writer.clear();
let labels1: Vec<(String, String)> = vec![("method".to_string(), "GET".to_string())];
let labels2: Vec<(String, String)> = vec![("method".to_string(), "POST".to_string())];
writer.counter("requests_total", labels1, 10.0)?;
writer.counter("requests_total", labels2, 5.0)?;
assert_eq!(writer.len(), 2);
Ok(())
}
#[tokio::test]
async fn test_flush() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let writer = MetricsWriter::with_path(temp_dir.path())?;
writer.clear();
writer.counter("test_counter", Vec::<(String, String)>::new(), 42.0)?;
writer.gauge("test_gauge", Vec::<(String, String)>::new(), 21.5)?;
writer.flush().await?;
let content = fs::read_to_string(temp_dir.path().join("metrics.prom"))?;
assert!(content.contains("test_counter"));
assert!(content.contains("test_gauge"));
assert!(content.contains("42"));
assert!(content.contains("21.5"));
Ok(())
}
#[tokio::test]
async fn test_flush_with_labels() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let writer = MetricsWriter::with_path(temp_dir.path())?;
writer.clear();
let labels: Vec<(String, String)> = vec![("method".to_string(), "GET".to_string())];
writer.counter("requests_total", labels, 100.0)?;
writer.flush().await?;
let content = fs::read_to_string(temp_dir.path().join("metrics.prom"))?;
assert!(content.contains("requests_total{method=\"GET\"}"));
assert!(content.contains("100"));
Ok(())
}
#[test]
fn test_make_key_no_labels() {
let key = MetricsWriter::make_key("metric_name", &Labels::new());
assert_eq!(key, "metric_name");
}
#[test]
fn test_make_key_with_labels() {
let labels = Labels::from(vec![("method".to_string(), "GET".to_string())]);
let key1 = MetricsWriter::make_key("requests", &labels);
let key2 = MetricsWriter::make_key("requests", &Labels::new());
assert_ne!(key1, key2);
}
#[tokio::test]
async fn test_counter_increment() -> Result<()> {
let writer = MetricsWriter::with_path("/tmp/metrics_test")?;
writer.clear();
writer.inc("test_metric", Vec::<(String, String)>::new())?;
writer.inc("test_metric", Vec::<(String, String)>::new())?;
writer.inc("test_metric", Vec::<(String, String)>::new())?;
assert_eq!(writer.len(), 1);
Ok(())
}
}