arkflow 0.1.0

High-performance Rust flow processing engine
Documentation
//! 指标收集模块
//!
//! 提供流处理引擎的指标收集和监控功能

use std::sync::Arc;
use std::time::{Duration, Instant};

use serde::{Deserialize, Serialize};

use crate::Error;

/// 指标类型
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MetricType {
    /// 计数器(只增不减)
    Counter,
    /// 仪表(可增可减)
    Gauge,
    /// 计时器(测量持续时间)
    Timing,
}

/// 指标接口
pub trait Metric: Send + Sync {
    /// 递增计数器
    fn increment(&self, value: u64);

    /// 递减计数器(仅适用于Gauge类型)
    fn decrement(&self, value: u64);

    /// 设置值
    fn set(&self, value: u64);

    /// 记录计时
    fn timing(&self, duration: Duration);

    /// 获取指标类型
    fn metric_type(&self) -> MetricType;

    /// 获取指标名称
    fn name(&self) -> &str;
}

/// 指标收集器接口
pub trait MetricCollector: Send + Sync {
    /// 创建或获取计数器
    fn counter(&self, name: &str) -> Arc<dyn Metric>;

    /// 创建或获取仪表
    fn gauge(&self, name: &str) -> Arc<dyn Metric>;

    /// 创建或获取计时器
    fn timer(&self, name: &str) -> Arc<dyn Metric>;

    /// 添加标签
    fn with_tags(
        &self,
        tags: std::collections::HashMap<String, String>,
    ) -> Box<dyn MetricCollector>;

    /// 关闭收集器
    fn close(&self) -> Result<(), Error>;
}

/// 空指标实现(不执行任何操作)
pub struct NoopMetric {
    name: String,
    metric_type: MetricType,
}

impl NoopMetric {
    /// 创建一个新的空指标
    pub fn new(name: &str, metric_type: MetricType) -> Self {
        Self {
            name: name.to_string(),
            metric_type,
        }
    }
}

impl Metric for NoopMetric {
    fn increment(&self, _value: u64) {}
    fn decrement(&self, _value: u64) {}
    fn set(&self, _value: u64) {}
    fn timing(&self, _duration: Duration) {}
    fn metric_type(&self) -> MetricType {
        self.metric_type.clone()
    }
    fn name(&self) -> &str {
        &self.name
    }
}

/// 空指标收集器实现(不执行任何操作)
pub struct NoopCollector {}

impl NoopCollector {
    /// 创建一个新的空指标收集器
    pub fn new() -> Self {
        Self {}
    }
}

impl MetricCollector for NoopCollector {
    fn counter(&self, name: &str) -> Arc<dyn Metric> {
        Arc::new(NoopMetric::new(name, MetricType::Counter))
    }

    fn gauge(&self, name: &str) -> Arc<dyn Metric> {
        Arc::new(NoopMetric::new(name, MetricType::Gauge))
    }

    fn timer(&self, name: &str) -> Arc<dyn Metric> {
        Arc::new(NoopMetric::new(name, MetricType::Timing))
    }

    fn with_tags(
        &self,
        _tags: std::collections::HashMap<String, String>,
    ) -> Box<dyn MetricCollector> {
        Box::new(Self::new())
    }

    fn close(&self) -> Result<(), Error> {
        Ok(())
    }
}

/// Prometheus指标收集器
pub struct PrometheusCollector {
    // 在实际实现中,这里应该有一个Prometheus注册表
    // 例如:registry: prometheus::Registry,
    prefix: String,
}

impl PrometheusCollector {
    /// 创建一个新的Prometheus指标收集器
    pub fn new(prefix: &str) -> Result<Self, Error> {
        Ok(Self {
            // registry: prometheus::Registry::new(),
            prefix: prefix.to_string(),
        })
    }
}

impl MetricCollector for PrometheusCollector {
    fn counter(&self, name: &str) -> Arc<dyn Metric> {
        // 在实际实现中,这里应该创建一个Prometheus计数器
        // 例如:
        /*
        let counter = prometheus::Counter::new(
            format!("{}{}", self.prefix, name),
            format!("Counter for {}", name),
        ).unwrap();
        self.registry.register(Box::new(counter.clone())).unwrap();
        Arc::new(PrometheusMetric::new_counter(name, counter))
        */

        // 返回一个空指标作为占位符
        Arc::new(NoopMetric::new(name, MetricType::Counter))
    }

    fn gauge(&self, name: &str) -> Arc<dyn Metric> {
        // 在实际实现中,这里应该创建一个Prometheus仪表
        // 返回一个空指标作为占位符
        Arc::new(NoopMetric::new(name, MetricType::Gauge))
    }

    fn timer(&self, name: &str) -> Arc<dyn Metric> {
        // 在实际实现中,这里应该创建一个Prometheus直方图
        // 返回一个空指标作为占位符
        Arc::new(NoopMetric::new(name, MetricType::Timing))
    }

    fn with_tags(
        &self,
        _tags: std::collections::HashMap<String, String>,
    ) -> Box<dyn MetricCollector> {
        // 在实际实现中,这里应该创建一个带有标签的新收集器
        Box::new(Self {
            // registry: self.registry.clone(),
            prefix: self.prefix.clone(),
        })
    }

    fn close(&self) -> Result<(), Error> {
        // Prometheus收集器不需要特殊的关闭操作
        Ok(())
    }
}

/// 计时辅助结构
pub struct TimingHelper {
    metric: Arc<dyn Metric>,
    start: Instant,
}

impl TimingHelper {
    /// 创建一个新的计时辅助结构
    pub fn new(metric: Arc<dyn Metric>) -> Self {
        Self {
            metric,
            start: Instant::now(),
        }
    }

    /// 完成计时并记录持续时间
    pub fn done(self) {
        let duration = self.start.elapsed();
        self.metric.timing(duration);
    }
}

/// 创建指标收集器
pub fn create_metrics(
    config: &crate::config::MetricsConfig,
) -> Result<Box<dyn MetricCollector>, Error> {
    if !config.enabled {
        return Ok(Box::new(NoopCollector::new()));
    }

    let prefix = config.prefix.as_deref().unwrap_or("").to_string();

    match config.type_name.as_str() {
        "prometheus" => {
            let collector = PrometheusCollector::new(&prefix)?;

            // 如果有标签,则添加
            if let Some(tags) = &config.tags {
                Ok(collector.with_tags(tags.clone()))
            } else {
                Ok(Box::new(collector))
            }
        }
        "statsd" => {
            // 在实际实现中,这里应该创建一个StatsD收集器
            // 暂时返回一个空收集器
            Ok(Box::new(NoopCollector::new()))
        }
        "none" | "noop" => Ok(Box::new(NoopCollector::new())),
        _ => Err(Error::Config(format!(
            "不支持的指标类型: {}",
            config.type_name
        ))),
    }
}