llm_optimizer_collector/
telemetry.rs

1//! OpenTelemetry Integration
2//!
3//! This module provides OpenTelemetry metrics and tracing integration
4//! for comprehensive observability.
5
6use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider as _, UpDownCounter};
7use opentelemetry::KeyValue;
8use opentelemetry_sdk::metrics::SdkMeterProvider;
9use opentelemetry_sdk::Resource;
10use std::sync::Arc;
11use thiserror::Error;
12
13use crate::feedback_events::FeedbackEvent;
14
15/// Telemetry error types
16#[derive(Error, Debug)]
17pub enum TelemetryError {
18    #[error("Telemetry initialization error: {0}")]
19    InitializationError(String),
20
21    #[error("Metrics export error: {0}")]
22    ExportError(String),
23
24    #[error(transparent)]
25    OpenTelemetryError(#[from] opentelemetry::metrics::MetricsError),
26}
27
28pub type Result<T> = std::result::Result<T, TelemetryError>;
29
30/// Telemetry metrics collector
31pub struct TelemetryMetrics {
32    /// Meter for creating instruments
33    meter: Meter,
34    /// Event counter
35    event_counter: Counter<u64>,
36    /// Event processing latency
37    processing_latency: Histogram<f64>,
38    /// Active collectors
39    active_collectors: UpDownCounter<i64>,
40    /// Event batch size
41    batch_size: Histogram<u64>,
42    /// Error counter
43    error_counter: Counter<u64>,
44    /// Kafka publish latency
45    kafka_publish_latency: Histogram<f64>,
46    /// Kafka publish errors
47    kafka_errors: Counter<u64>,
48    /// Rate limit checks counter
49    rate_limit_checks: Counter<u64>,
50    /// Rate limit current usage gauge
51    rate_limit_usage: UpDownCounter<i64>,
52    /// Active sources being tracked
53    rate_limit_active_sources: UpDownCounter<i64>,
54}
55
56impl TelemetryMetrics {
57    /// Create new telemetry metrics
58    pub fn new(meter: Meter) -> Result<Self> {
59        let event_counter = meter
60            .u64_counter("feedback_events_total")
61            .with_description("Total number of feedback events received")
62            .with_unit("events")
63            .init();
64
65        let processing_latency = meter
66            .f64_histogram("feedback_processing_duration_ms")
67            .with_description("Time taken to process feedback events")
68            .with_unit("ms")
69            .init();
70
71        let active_collectors = meter
72            .i64_up_down_counter("active_feedback_collectors")
73            .with_description("Number of active feedback collectors")
74            .init();
75
76        let batch_size = meter
77            .u64_histogram("feedback_batch_size")
78            .with_description("Number of events in each batch")
79            .with_unit("events")
80            .init();
81
82        let error_counter = meter
83            .u64_counter("feedback_errors_total")
84            .with_description("Total number of feedback processing errors")
85            .with_unit("errors")
86            .init();
87
88        let kafka_publish_latency = meter
89            .f64_histogram("kafka_publish_duration_ms")
90            .with_description("Time taken to publish to Kafka")
91            .with_unit("ms")
92            .init();
93
94        let kafka_errors = meter
95            .u64_counter("kafka_errors_total")
96            .with_description("Total number of Kafka publish errors")
97            .with_unit("errors")
98            .init();
99
100        let rate_limit_checks = meter
101            .u64_counter("rate_limit_checks_total")
102            .with_description("Total number of rate limit checks")
103            .with_unit("checks")
104            .init();
105
106        let rate_limit_usage = meter
107            .i64_up_down_counter("rate_limit_current_usage")
108            .with_description("Current rate limit usage")
109            .init();
110
111        let rate_limit_active_sources = meter
112            .i64_up_down_counter("rate_limit_active_sources")
113            .with_description("Number of active sources being tracked")
114            .init();
115
116        Ok(Self {
117            meter,
118            event_counter,
119            processing_latency,
120            active_collectors,
121            batch_size,
122            error_counter,
123            kafka_publish_latency,
124            kafka_errors,
125            rate_limit_checks,
126            rate_limit_usage,
127            rate_limit_active_sources,
128        })
129    }
130
131    /// Record event received
132    pub fn record_event(&self, event: &FeedbackEvent) {
133        let event_type = event.event_type();
134        self.event_counter.add(1, &[KeyValue::new("event_type", event_type)]);
135    }
136
137    /// Record processing latency
138    pub fn record_processing_latency(&self, duration_ms: f64, event_type: &str) {
139        self.processing_latency.record(
140            duration_ms,
141            &[KeyValue::new("event_type", event_type.to_string())],
142        );
143    }
144
145    /// Increment active collectors
146    pub fn increment_active_collectors(&self) {
147        self.active_collectors.add(1, &[]);
148    }
149
150    /// Decrement active collectors
151    pub fn decrement_active_collectors(&self) {
152        self.active_collectors.add(-1, &[]);
153    }
154
155    /// Record batch size
156    pub fn record_batch_size(&self, size: u64) {
157        self.batch_size.record(size, &[]);
158    }
159
160    /// Record error
161    pub fn record_error(&self, error_type: &str) {
162        self.error_counter.add(1, &[KeyValue::new("error_type", error_type.to_string())]);
163    }
164
165    /// Record Kafka publish latency
166    pub fn record_kafka_publish_latency(&self, duration_ms: f64, success: bool) {
167        self.kafka_publish_latency.record(
168            duration_ms,
169            &[KeyValue::new("success", success.to_string())],
170        );
171    }
172
173    /// Record Kafka error
174    pub fn record_kafka_error(&self, error_type: &str) {
175        self.kafka_errors.add(1, &[KeyValue::new("error_type", error_type.to_string())]);
176    }
177
178    /// Record rate limit check
179    pub fn record_rate_limit_check(&self, allowed: bool) {
180        let status = if allowed { "allowed" } else { "denied" };
181        self.rate_limit_checks.add(1, &[KeyValue::new("status", status.to_string())]);
182    }
183
184    /// Update rate limit usage
185    pub fn update_rate_limit_usage(&self, delta: i64) {
186        self.rate_limit_usage.add(delta, &[]);
187    }
188
189    /// Update active sources count
190    pub fn update_rate_limit_sources(&self, count: i64) {
191        self.rate_limit_active_sources.add(count, &[]);
192    }
193}
194
195/// Telemetry provider configuration
196#[derive(Debug, Clone)]
197pub struct TelemetryConfig {
198    /// Service name
199    pub service_name: String,
200    /// Service version
201    pub service_version: String,
202    /// Export interval in seconds
203    pub export_interval_secs: u64,
204    /// OTLP endpoint (optional)
205    pub otlp_endpoint: Option<String>,
206}
207
208impl Default for TelemetryConfig {
209    fn default() -> Self {
210        Self {
211            service_name: "llm-optimizer-collector".to_string(),
212            service_version: env!("CARGO_PKG_VERSION").to_string(),
213            export_interval_secs: 60,
214            otlp_endpoint: None,
215        }
216    }
217}
218
219/// Telemetry provider
220pub struct TelemetryProvider {
221    /// Meter provider
222    meter_provider: SdkMeterProvider,
223    /// Telemetry metrics
224    metrics: Arc<TelemetryMetrics>,
225}
226
227impl TelemetryProvider {
228    /// Initialize telemetry provider
229    pub fn init(config: TelemetryConfig) -> Result<Self> {
230        // Create resource
231        let resource = Resource::new(vec![
232            KeyValue::new("service.name", config.service_name.clone()),
233            KeyValue::new("service.version", config.service_version.clone()),
234        ]);
235
236        // Create meter provider
237        // For development without OTLP, create a meter provider without periodic export
238        // Metrics will still be collected but not exported
239        // TODO: Add OTLP exporter support when endpoint is configured
240        let meter_provider = SdkMeterProvider::builder()
241            .with_resource(resource)
242            .build();
243
244        // Create meter
245        let meter = meter_provider.meter("feedback_collector");
246
247        // Create metrics
248        let metrics = Arc::new(TelemetryMetrics::new(meter)?);
249
250        Ok(Self {
251            meter_provider,
252            metrics,
253        })
254    }
255
256    /// Get telemetry metrics
257    pub fn metrics(&self) -> Arc<TelemetryMetrics> {
258        self.metrics.clone()
259    }
260
261    /// Shutdown telemetry
262    pub fn shutdown(self) -> Result<()> {
263        self.meter_provider
264            .shutdown()
265            .map_err(|e| TelemetryError::ExportError(e.to_string()))
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::feedback_events::{UserFeedbackEvent, PerformanceMetricsEvent};
273    use uuid::Uuid;
274
275    #[test]
276    fn test_telemetry_config_default() {
277        let config = TelemetryConfig::default();
278        assert_eq!(config.service_name, "llm-optimizer-collector");
279        assert_eq!(config.export_interval_secs, 60);
280    }
281
282    #[test]
283    fn test_telemetry_provider_init() {
284        let config = TelemetryConfig::default();
285        let provider = TelemetryProvider::init(config).unwrap();
286        let metrics = provider.metrics();
287
288        // Metrics should be created
289        assert!(Arc::strong_count(&metrics) >= 1);
290
291        // Shutdown
292        provider.shutdown().unwrap();
293    }
294
295    #[test]
296    fn test_record_event() {
297        let config = TelemetryConfig::default();
298        let provider = TelemetryProvider::init(config).unwrap();
299        let metrics = provider.metrics();
300
301        let request_id = Uuid::new_v4();
302        let event = FeedbackEvent::UserFeedback(UserFeedbackEvent::new("session1", request_id));
303
304        // Should not panic
305        metrics.record_event(&event);
306
307        provider.shutdown().unwrap();
308    }
309
310    #[test]
311    fn test_record_metrics() {
312        let config = TelemetryConfig::default();
313        let provider = TelemetryProvider::init(config).unwrap();
314        let metrics = provider.metrics();
315
316        // Record various metrics
317        metrics.record_processing_latency(100.0, "user_feedback");
318        metrics.increment_active_collectors();
319        metrics.record_batch_size(50);
320        metrics.record_error("validation_error");
321        metrics.record_kafka_publish_latency(25.0, true);
322        metrics.decrement_active_collectors();
323
324        // Should not panic
325        provider.shutdown().unwrap();
326    }
327
328    #[test]
329    fn test_active_collectors_tracking() {
330        let config = TelemetryConfig::default();
331        let provider = TelemetryProvider::init(config).unwrap();
332        let metrics = provider.metrics();
333
334        metrics.increment_active_collectors();
335        metrics.increment_active_collectors();
336        metrics.decrement_active_collectors();
337
338        // Net: +1 collector
339        provider.shutdown().unwrap();
340    }
341
342    #[test]
343    fn test_kafka_metrics() {
344        let config = TelemetryConfig::default();
345        let provider = TelemetryProvider::init(config).unwrap();
346        let metrics = provider.metrics();
347
348        metrics.record_kafka_publish_latency(50.0, true);
349        metrics.record_kafka_publish_latency(150.0, false);
350        metrics.record_kafka_error("connection_error");
351
352        provider.shutdown().unwrap();
353    }
354}