1use std::sync::{Arc, Mutex};
42use std::time::{Duration, Instant};
43
44#[derive(Debug, Clone, PartialEq)]
46pub enum MetricEvent {
47 Counter(String, u64),
49 Gauge(String, f64),
51 Timing(String, u64),
53 ThresholdExceeded(String, f64, f64),
55}
56
57pub type MetricCallback = Arc<dyn Fn(&MetricEvent) + Send + Sync>;
59
60pub struct MetricStream {
62 subscribers: Arc<Mutex<Vec<MetricCallback>>>,
64 thresholds: Arc<Mutex<Vec<ThresholdAlert>>>,
66 rate_limiter: Arc<Mutex<RateLimiter>>,
68}
69
70#[derive(Debug, Clone)]
72pub struct ThresholdAlert {
73 pub metric_pattern: String,
75 pub threshold: f64,
77 pub above: bool,
79}
80
81struct RateLimiter {
83 last_emit: std::collections::HashMap<String, Instant>,
85 min_interval: Duration,
87}
88
89impl RateLimiter {
90 fn new(min_interval: Duration) -> Self {
91 Self {
92 last_emit: std::collections::HashMap::new(),
93 min_interval,
94 }
95 }
96
97 fn should_emit(&mut self, key: &str) -> bool {
98 let now = Instant::now();
99 if let Some(last) = self.last_emit.get(key) {
100 if now.duration_since(*last) < self.min_interval {
101 return false;
102 }
103 }
104 self.last_emit.insert(key.to_string(), now);
105 true
106 }
107}
108
109impl MetricStream {
110 pub fn new() -> Self {
112 Self {
113 subscribers: Arc::new(Mutex::new(Vec::new())),
114 thresholds: Arc::new(Mutex::new(Vec::new())),
115 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(Duration::from_millis(100)))),
116 }
117 }
118
119 pub fn with_rate_limit(min_interval: Duration) -> Self {
121 Self {
122 subscribers: Arc::new(Mutex::new(Vec::new())),
123 thresholds: Arc::new(Mutex::new(Vec::new())),
124 rate_limiter: Arc::new(Mutex::new(RateLimiter::new(min_interval))),
125 }
126 }
127
128 pub fn subscribe<F>(&mut self, callback: F)
130 where
131 F: Fn(&MetricEvent) + Send + Sync + 'static,
132 {
133 let mut subscribers = self.subscribers.lock().unwrap();
134 subscribers.push(Arc::new(callback));
135 }
136
137 pub fn add_threshold_alert(&mut self, metric: impl Into<String>, threshold: f64, above: bool) {
139 let mut thresholds = self.thresholds.lock().unwrap();
140 thresholds.push(ThresholdAlert {
141 metric_pattern: metric.into(),
142 threshold,
143 above,
144 });
145 }
146
147 pub fn publish_counter(&self, name: impl Into<String>, value: u64) {
149 let name = name.into();
150 if !self.should_emit(&name) {
151 return;
152 }
153
154 let event = MetricEvent::Counter(name, value);
155 self.emit(&event);
156 }
157
158 pub fn publish_gauge(&self, name: impl Into<String>, value: f64) {
160 let name = name.into();
161 if !self.should_emit(&name) {
162 return;
163 }
164
165 let event = MetricEvent::Gauge(name.clone(), value);
166 self.emit(&event);
167 self.check_thresholds(&name, value);
168 }
169
170 pub fn publish_timing(&self, name: impl Into<String>, duration_us: u64) {
172 let name = name.into();
173 if !self.should_emit(&name) {
174 return;
175 }
176
177 let event = MetricEvent::Timing(name, duration_us);
178 self.emit(&event);
179 }
180
181 fn emit(&self, event: &MetricEvent) {
183 let subscribers = self.subscribers.lock().unwrap();
184 for callback in subscribers.iter() {
185 callback(event);
186 }
187 }
188
189 fn should_emit(&self, key: &str) -> bool {
191 let mut limiter = self.rate_limiter.lock().unwrap();
192 limiter.should_emit(key)
193 }
194
195 fn check_thresholds(&self, name: &str, value: f64) {
197 let thresholds = self.thresholds.lock().unwrap();
198
199 for alert in thresholds.iter() {
200 if name.contains(&alert.metric_pattern) {
201 let exceeded = if alert.above {
202 value > alert.threshold
203 } else {
204 value < alert.threshold
205 };
206
207 if exceeded {
208 let event =
209 MetricEvent::ThresholdExceeded(name.to_string(), value, alert.threshold);
210 drop(thresholds); self.emit(&event);
212 break;
213 }
214 }
215 }
216 }
217
218 pub fn subscriber_count(&self) -> usize {
220 self.subscribers.lock().unwrap().len()
221 }
222
223 pub fn clear_subscribers(&mut self) {
225 let mut subscribers = self.subscribers.lock().unwrap();
226 subscribers.clear();
227 }
228}
229
230impl Default for MetricStream {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239 use std::sync::atomic::{AtomicU64, Ordering};
240
241 #[test]
242 fn test_subscribe_and_publish() {
243 let mut stream = MetricStream::new();
244 let counter = Arc::new(AtomicU64::new(0));
245 let counter_clone = counter.clone();
246
247 stream.subscribe(move |event| {
248 if matches!(event, MetricEvent::Counter(_, _)) {
249 counter_clone.fetch_add(1, Ordering::Relaxed);
250 }
251 });
252
253 stream.publish_counter("test", 42);
254
255 std::thread::sleep(Duration::from_millis(10));
257 assert_eq!(counter.load(Ordering::Relaxed), 1);
258 }
259
260 #[test]
261 fn test_multiple_subscribers() {
262 let mut stream = MetricStream::new();
263 let count1 = Arc::new(AtomicU64::new(0));
264 let count2 = Arc::new(AtomicU64::new(0));
265
266 let c1 = count1.clone();
267 let c2 = count2.clone();
268
269 stream.subscribe(move |_| {
270 c1.fetch_add(1, Ordering::Relaxed);
271 });
272
273 stream.subscribe(move |_| {
274 c2.fetch_add(1, Ordering::Relaxed);
275 });
276
277 stream.publish_counter("test", 1);
278
279 std::thread::sleep(Duration::from_millis(10));
280 assert_eq!(count1.load(Ordering::Relaxed), 1);
281 assert_eq!(count2.load(Ordering::Relaxed), 1);
282 }
283
284 #[test]
285 fn test_threshold_alert() {
286 let mut stream = MetricStream::new();
287 stream.add_threshold_alert("cpu", 80.0, true);
288
289 let alerted = Arc::new(AtomicU64::new(0));
290 let alerted_clone = alerted.clone();
291
292 stream.subscribe(move |event| {
293 if matches!(event, MetricEvent::ThresholdExceeded(_, _, _)) {
294 alerted_clone.fetch_add(1, Ordering::Relaxed);
295 }
296 });
297
298 stream.publish_gauge("cpu_usage", 50.0); std::thread::sleep(Duration::from_millis(10));
300 assert_eq!(alerted.load(Ordering::Relaxed), 0);
301
302 std::thread::sleep(Duration::from_millis(110));
304
305 stream.publish_gauge("cpu_usage", 85.0); std::thread::sleep(Duration::from_millis(10));
307 assert_eq!(alerted.load(Ordering::Relaxed), 1);
308 }
309
310 #[test]
311 fn test_rate_limiting() {
312 let stream = MetricStream::with_rate_limit(Duration::from_millis(50));
313 let count = Arc::new(AtomicU64::new(0));
314 let count_clone = count.clone();
315
316 let mut stream_mut = stream;
317 stream_mut.subscribe(move |_| {
318 count_clone.fetch_add(1, Ordering::Relaxed);
319 });
320
321 for _ in 0..10 {
323 stream_mut.publish_counter("test", 1);
324 }
325
326 std::thread::sleep(Duration::from_millis(10));
327 assert!(count.load(Ordering::Relaxed) < 10);
329 }
330
331 #[test]
332 fn test_subscriber_count() {
333 let mut stream = MetricStream::new();
334 assert_eq!(stream.subscriber_count(), 0);
335
336 stream.subscribe(|_| {});
337 assert_eq!(stream.subscriber_count(), 1);
338
339 stream.subscribe(|_| {});
340 assert_eq!(stream.subscriber_count(), 2);
341
342 stream.clear_subscribers();
343 assert_eq!(stream.subscriber_count(), 0);
344 }
345
346 #[test]
347 fn test_gauge_and_timing() {
348 let mut stream = MetricStream::new();
349 let events = Arc::new(Mutex::new(Vec::new()));
350 let events_clone = events.clone();
351
352 stream.subscribe(move |event| {
353 events_clone.lock().unwrap().push(event.clone());
354 });
355
356 stream.publish_gauge("memory", 1024.5);
357 stream.publish_timing("query", 1500);
358
359 std::thread::sleep(Duration::from_millis(10));
360 let recorded = events.lock().unwrap();
361 assert_eq!(recorded.len(), 2);
362 }
363}