rsflow/
metrics.rs

1//! 指标收集模块
2//!
3//! 提供流处理引擎的指标收集和监控功能
4
5use std::sync::Arc;
6use std::time::{Duration, Instant};
7
8use serde::{Deserialize, Serialize};
9
10use crate::Error;
11
12/// 指标类型
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum MetricType {
15    /// 计数器(只增不减)
16    Counter,
17    /// 仪表(可增可减)
18    Gauge,
19    /// 计时器(测量持续时间)
20    Timing,
21}
22
23/// 指标接口
24pub trait Metric: Send + Sync {
25    /// 递增计数器
26    fn increment(&self, value: u64);
27    
28    /// 递减计数器(仅适用于Gauge类型)
29    fn decrement(&self, value: u64);
30    
31    /// 设置值
32    fn set(&self, value: u64);
33    
34    /// 记录计时
35    fn timing(&self, duration: Duration);
36    
37    /// 获取指标类型
38    fn metric_type(&self) -> MetricType;
39    
40    /// 获取指标名称
41    fn name(&self) -> &str;
42}
43
44/// 指标收集器接口
45pub trait MetricCollector: Send + Sync {
46    /// 创建或获取计数器
47    fn counter(&self, name: &str) -> Arc<dyn Metric>;
48    
49    /// 创建或获取仪表
50    fn gauge(&self, name: &str) -> Arc<dyn Metric>;
51    
52    /// 创建或获取计时器
53    fn timer(&self, name: &str) -> Arc<dyn Metric>;
54    
55    /// 添加标签
56    fn with_tags(&self, tags: std::collections::HashMap<String, String>) -> Box<dyn MetricCollector>;
57    
58    /// 关闭收集器
59    fn close(&self) -> Result<(), Error>;
60}
61
62/// 空指标实现(不执行任何操作)
63pub struct NoopMetric {
64    name: String,
65    metric_type: MetricType,
66}
67
68impl NoopMetric {
69    /// 创建一个新的空指标
70    pub fn new(name: &str, metric_type: MetricType) -> Self {
71        Self {
72            name: name.to_string(),
73            metric_type,
74        }
75    }
76}
77
78impl Metric for NoopMetric {
79    fn increment(&self, _value: u64) {}
80    fn decrement(&self, _value: u64) {}
81    fn set(&self, _value: u64) {}
82    fn timing(&self, _duration: Duration) {}
83    fn metric_type(&self) -> MetricType { self.metric_type.clone() }
84    fn name(&self) -> &str { &self.name }
85}
86
87/// 空指标收集器实现(不执行任何操作)
88pub struct NoopCollector {}
89
90impl NoopCollector {
91    /// 创建一个新的空指标收集器
92    pub fn new() -> Self {
93        Self {}
94    }
95}
96
97impl MetricCollector for NoopCollector {
98    fn counter(&self, name: &str) -> Arc<dyn Metric> {
99        Arc::new(NoopMetric::new(name, MetricType::Counter))
100    }
101    
102    fn gauge(&self, name: &str) -> Arc<dyn Metric> {
103        Arc::new(NoopMetric::new(name, MetricType::Gauge))
104    }
105    
106    fn timer(&self, name: &str) -> Arc<dyn Metric> {
107        Arc::new(NoopMetric::new(name, MetricType::Timing))
108    }
109    
110    fn with_tags(&self, _tags: std::collections::HashMap<String, String>) -> Box<dyn MetricCollector> {
111        Box::new(Self::new())
112    }
113    
114    fn close(&self) -> Result<(), Error> {
115        Ok(())
116    }
117}
118
119/// Prometheus指标收集器
120pub struct PrometheusCollector {
121    // 在实际实现中,这里应该有一个Prometheus注册表
122    // 例如:registry: prometheus::Registry,
123    prefix: String,
124}
125
126impl PrometheusCollector {
127    /// 创建一个新的Prometheus指标收集器
128    pub fn new(prefix: &str) -> Result<Self, Error> {
129        Ok(Self {
130            // registry: prometheus::Registry::new(),
131            prefix: prefix.to_string(),
132        })
133    }
134}
135
136impl MetricCollector for PrometheusCollector {
137    fn counter(&self, name: &str) -> Arc<dyn Metric> {
138        // 在实际实现中,这里应该创建一个Prometheus计数器
139        // 例如:
140        /*
141        let counter = prometheus::Counter::new(
142            format!("{}{}", self.prefix, name),
143            format!("Counter for {}", name),
144        ).unwrap();
145        self.registry.register(Box::new(counter.clone())).unwrap();
146        Arc::new(PrometheusMetric::new_counter(name, counter))
147        */
148        
149        // 返回一个空指标作为占位符
150        Arc::new(NoopMetric::new(name, MetricType::Counter))
151    }
152    
153    fn gauge(&self, name: &str) -> Arc<dyn Metric> {
154        // 在实际实现中,这里应该创建一个Prometheus仪表
155        // 返回一个空指标作为占位符
156        Arc::new(NoopMetric::new(name, MetricType::Gauge))
157    }
158    
159    fn timer(&self, name: &str) -> Arc<dyn Metric> {
160        // 在实际实现中,这里应该创建一个Prometheus直方图
161        // 返回一个空指标作为占位符
162        Arc::new(NoopMetric::new(name, MetricType::Timing))
163    }
164    
165    fn with_tags(&self, _tags: std::collections::HashMap<String, String>) -> Box<dyn MetricCollector> {
166        // 在实际实现中,这里应该创建一个带有标签的新收集器
167        Box::new(Self {
168            // registry: self.registry.clone(),
169            prefix: self.prefix.clone(),
170        })
171    }
172    
173    fn close(&self) -> Result<(), Error> {
174        // Prometheus收集器不需要特殊的关闭操作
175        Ok(())
176    }
177}
178
179/// 计时辅助结构
180pub struct TimingHelper {
181    metric: Arc<dyn Metric>,
182    start: Instant,
183}
184
185impl TimingHelper {
186    /// 创建一个新的计时辅助结构
187    pub fn new(metric: Arc<dyn Metric>) -> Self {
188        Self {
189            metric,
190            start: Instant::now(),
191        }
192    }
193    
194    /// 完成计时并记录持续时间
195    pub fn done(self) {
196        let duration = self.start.elapsed();
197        self.metric.timing(duration);
198    }
199}
200
201/// 创建指标收集器
202pub fn create_metrics(config: &crate::config::MetricsConfig) -> Result<Box<dyn MetricCollector>, Error> {
203    if !config.enabled {
204        return Ok(Box::new(NoopCollector::new()));
205    }
206    
207    let prefix = config.prefix.as_deref().unwrap_or("").to_string();
208    
209    match config.type_name.as_str() {
210        "prometheus" => {
211            let collector = PrometheusCollector::new(&prefix)?;
212            
213            // 如果有标签,则添加
214            if let Some(tags) = &config.tags {
215                Ok(collector.with_tags(tags.clone()))
216            } else {
217                Ok(Box::new(collector))
218            }
219        },
220        "statsd" => {
221            // 在实际实现中,这里应该创建一个StatsD收集器
222            // 暂时返回一个空收集器
223            Ok(Box::new(NoopCollector::new()))
224        },
225        "none" | "noop" => {
226            Ok(Box::new(NoopCollector::new()))
227        },
228        _ => Err(Error::Config(format!("不支持的指标类型: {}", config.type_name))),
229    }
230}