Skip to main content

embeddenator_obs/obs/
streaming.rs

1//! Real-Time Metric Streaming
2//!
3//! Provides callback-based real-time streaming of observability metrics
4//! for live monitoring, alerting, and reactive systems.
5//!
6//! # Features
7//!
8//! - Callback-based metric updates
9//! - Threshold-based alerting
10//! - Metric change detection
11//! - Rate limiting for high-frequency metrics
12//! - Multiple subscriber support
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! use embeddenator_obs::streaming::{MetricStream, MetricEvent};
18//!
19//! let mut stream = MetricStream::new();
20//!
21//! // Subscribe to metric updates
22//! stream.subscribe(|event| {
23//!     match event {
24//!         MetricEvent::Counter(name, value) => {
25//!             println!("Counter {}: {}", name, value);
26//!         }
27//!         MetricEvent::Gauge(name, value) => {
28//!             if value > 100.0 {
29//!                 alert!("High gauge value");
30//!             }
31//!         }
32//!         _ => {}
33//!     }
34//! });
35//!
36//! // Publish metrics
37//! stream.publish_counter("requests", 42);
38//! stream.publish_gauge("cpu_usage", 75.5);
39//! ```
40
41use std::sync::{Arc, Mutex};
42use std::time::{Duration, Instant};
43
44/// Type of metric event.
45#[derive(Debug, Clone, PartialEq)]
46pub enum MetricEvent {
47    /// Counter metric (name, value)
48    Counter(String, u64),
49    /// Gauge metric (name, value)
50    Gauge(String, f64),
51    /// Timing metric (name, duration_us)
52    Timing(String, u64),
53    /// Threshold exceeded (metric, value, threshold)
54    ThresholdExceeded(String, f64, f64),
55}
56
57/// Metric subscriber callback.
58pub type MetricCallback = Arc<dyn Fn(&MetricEvent) + Send + Sync>;
59
60/// Real-time metric streaming system.
61pub struct MetricStream {
62    /// Active subscribers
63    subscribers: Arc<Mutex<Vec<MetricCallback>>>,
64    /// Threshold alerts
65    thresholds: Arc<Mutex<Vec<ThresholdAlert>>>,
66    /// Rate limiter state
67    rate_limiter: Arc<Mutex<RateLimiter>>,
68}
69
70/// Threshold-based alert configuration.
71#[derive(Debug, Clone)]
72pub struct ThresholdAlert {
73    /// Metric name pattern
74    pub metric_pattern: String,
75    /// Threshold value
76    pub threshold: f64,
77    /// Alert when above (true) or below (false)
78    pub above: bool,
79}
80
81/// Rate limiter to prevent callback flooding.
82struct RateLimiter {
83    /// Last emit time per metric
84    last_emit: std::collections::HashMap<String, Instant>,
85    /// Minimum interval between emits
86    min_interval: Duration,
87}
88
89impl RateLimiter {
90    fn new(min_interval: Duration) -> Self {
91        Self {
92            last_emit: std::collections::HashMap::new(),
93            min_interval,
94        }
95    }
96
97    fn should_emit(&mut self, key: &str) -> bool {
98        let now = Instant::now();
99        if let Some(last) = self.last_emit.get(key) {
100            if now.duration_since(*last) < self.min_interval {
101                return false;
102            }
103        }
104        self.last_emit.insert(key.to_string(), now);
105        true
106    }
107}
108
109impl MetricStream {
110    /// Create new metric stream.
111    pub fn new() -> Self {
112        Self {
113            subscribers: Arc::new(Mutex::new(Vec::new())),
114            thresholds: Arc::new(Mutex::new(Vec::new())),
115            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(Duration::from_millis(100)))),
116        }
117    }
118
119    /// Create with custom rate limit.
120    pub fn with_rate_limit(min_interval: Duration) -> Self {
121        Self {
122            subscribers: Arc::new(Mutex::new(Vec::new())),
123            thresholds: Arc::new(Mutex::new(Vec::new())),
124            rate_limiter: Arc::new(Mutex::new(RateLimiter::new(min_interval))),
125        }
126    }
127
128    /// Subscribe to metric events.
129    pub fn subscribe<F>(&mut self, callback: F)
130    where
131        F: Fn(&MetricEvent) + Send + Sync + 'static,
132    {
133        let mut subscribers = self.subscribers.lock().unwrap();
134        subscribers.push(Arc::new(callback));
135    }
136
137    /// Add threshold alert.
138    pub fn add_threshold_alert(&mut self, metric: impl Into<String>, threshold: f64, above: bool) {
139        let mut thresholds = self.thresholds.lock().unwrap();
140        thresholds.push(ThresholdAlert {
141            metric_pattern: metric.into(),
142            threshold,
143            above,
144        });
145    }
146
147    /// Publish counter metric.
148    pub fn publish_counter(&self, name: impl Into<String>, value: u64) {
149        let name = name.into();
150        if !self.should_emit(&name) {
151            return;
152        }
153
154        let event = MetricEvent::Counter(name, value);
155        self.emit(&event);
156    }
157
158    /// Publish gauge metric.
159    pub fn publish_gauge(&self, name: impl Into<String>, value: f64) {
160        let name = name.into();
161        if !self.should_emit(&name) {
162            return;
163        }
164
165        let event = MetricEvent::Gauge(name.clone(), value);
166        self.emit(&event);
167        self.check_thresholds(&name, value);
168    }
169
170    /// Publish timing metric.
171    pub fn publish_timing(&self, name: impl Into<String>, duration_us: u64) {
172        let name = name.into();
173        if !self.should_emit(&name) {
174            return;
175        }
176
177        let event = MetricEvent::Timing(name, duration_us);
178        self.emit(&event);
179    }
180
181    /// Emit event to all subscribers.
182    fn emit(&self, event: &MetricEvent) {
183        let subscribers = self.subscribers.lock().unwrap();
184        for callback in subscribers.iter() {
185            callback(event);
186        }
187    }
188
189    /// Check if rate limiter allows emission.
190    fn should_emit(&self, key: &str) -> bool {
191        let mut limiter = self.rate_limiter.lock().unwrap();
192        limiter.should_emit(key)
193    }
194
195    /// Check threshold alerts for a metric.
196    fn check_thresholds(&self, name: &str, value: f64) {
197        let thresholds = self.thresholds.lock().unwrap();
198
199        for alert in thresholds.iter() {
200            if name.contains(&alert.metric_pattern) {
201                let exceeded = if alert.above {
202                    value > alert.threshold
203                } else {
204                    value < alert.threshold
205                };
206
207                if exceeded {
208                    let event =
209                        MetricEvent::ThresholdExceeded(name.to_string(), value, alert.threshold);
210                    drop(thresholds); // Release lock before emitting
211                    self.emit(&event);
212                    break;
213                }
214            }
215        }
216    }
217
218    /// Get subscriber count.
219    pub fn subscriber_count(&self) -> usize {
220        self.subscribers.lock().unwrap().len()
221    }
222
223    /// Clear all subscribers.
224    pub fn clear_subscribers(&mut self) {
225        let mut subscribers = self.subscribers.lock().unwrap();
226        subscribers.clear();
227    }
228}
229
230impl Default for MetricStream {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use std::sync::atomic::{AtomicU64, Ordering};
240
241    #[test]
242    fn test_subscribe_and_publish() {
243        let mut stream = MetricStream::new();
244        let counter = Arc::new(AtomicU64::new(0));
245        let counter_clone = counter.clone();
246
247        stream.subscribe(move |event| {
248            if matches!(event, MetricEvent::Counter(_, _)) {
249                counter_clone.fetch_add(1, Ordering::Relaxed);
250            }
251        });
252
253        stream.publish_counter("test", 42);
254
255        // Give some time for callback
256        std::thread::sleep(Duration::from_millis(10));
257        assert_eq!(counter.load(Ordering::Relaxed), 1);
258    }
259
260    #[test]
261    fn test_multiple_subscribers() {
262        let mut stream = MetricStream::new();
263        let count1 = Arc::new(AtomicU64::new(0));
264        let count2 = Arc::new(AtomicU64::new(0));
265
266        let c1 = count1.clone();
267        let c2 = count2.clone();
268
269        stream.subscribe(move |_| {
270            c1.fetch_add(1, Ordering::Relaxed);
271        });
272
273        stream.subscribe(move |_| {
274            c2.fetch_add(1, Ordering::Relaxed);
275        });
276
277        stream.publish_counter("test", 1);
278
279        std::thread::sleep(Duration::from_millis(10));
280        assert_eq!(count1.load(Ordering::Relaxed), 1);
281        assert_eq!(count2.load(Ordering::Relaxed), 1);
282    }
283
284    #[test]
285    fn test_threshold_alert() {
286        let mut stream = MetricStream::new();
287        stream.add_threshold_alert("cpu", 80.0, true);
288
289        let alerted = Arc::new(AtomicU64::new(0));
290        let alerted_clone = alerted.clone();
291
292        stream.subscribe(move |event| {
293            if matches!(event, MetricEvent::ThresholdExceeded(_, _, _)) {
294                alerted_clone.fetch_add(1, Ordering::Relaxed);
295            }
296        });
297
298        stream.publish_gauge("cpu_usage", 50.0); // No alert
299        std::thread::sleep(Duration::from_millis(10));
300        assert_eq!(alerted.load(Ordering::Relaxed), 0);
301
302        // Wait to bypass rate limiter (default 100ms)
303        std::thread::sleep(Duration::from_millis(110));
304
305        stream.publish_gauge("cpu_usage", 85.0); // Alert!
306        std::thread::sleep(Duration::from_millis(10));
307        assert_eq!(alerted.load(Ordering::Relaxed), 1);
308    }
309
310    #[test]
311    fn test_rate_limiting() {
312        let stream = MetricStream::with_rate_limit(Duration::from_millis(50));
313        let count = Arc::new(AtomicU64::new(0));
314        let count_clone = count.clone();
315
316        let mut stream_mut = stream;
317        stream_mut.subscribe(move |_| {
318            count_clone.fetch_add(1, Ordering::Relaxed);
319        });
320
321        // Rapid fire - should be rate limited
322        for _ in 0..10 {
323            stream_mut.publish_counter("test", 1);
324        }
325
326        std::thread::sleep(Duration::from_millis(10));
327        // Should only receive 1 due to rate limiting
328        assert!(count.load(Ordering::Relaxed) < 10);
329    }
330
331    #[test]
332    fn test_subscriber_count() {
333        let mut stream = MetricStream::new();
334        assert_eq!(stream.subscriber_count(), 0);
335
336        stream.subscribe(|_| {});
337        assert_eq!(stream.subscriber_count(), 1);
338
339        stream.subscribe(|_| {});
340        assert_eq!(stream.subscriber_count(), 2);
341
342        stream.clear_subscribers();
343        assert_eq!(stream.subscriber_count(), 0);
344    }
345
346    #[test]
347    fn test_gauge_and_timing() {
348        let mut stream = MetricStream::new();
349        let events = Arc::new(Mutex::new(Vec::new()));
350        let events_clone = events.clone();
351
352        stream.subscribe(move |event| {
353            events_clone.lock().unwrap().push(event.clone());
354        });
355
356        stream.publish_gauge("memory", 1024.5);
357        stream.publish_timing("query", 1500);
358
359        std::thread::sleep(Duration::from_millis(10));
360        let recorded = events.lock().unwrap();
361        assert_eq!(recorded.len(), 2);
362    }
363}