1use opentelemetry::metrics::{Counter, Histogram, Meter, MeterProvider as _, UpDownCounter};
7use opentelemetry::KeyValue;
8use opentelemetry_sdk::metrics::SdkMeterProvider;
9use opentelemetry_sdk::Resource;
10use std::sync::Arc;
11use thiserror::Error;
12
13use crate::feedback_events::FeedbackEvent;
14
15#[derive(Error, Debug)]
17pub enum TelemetryError {
18 #[error("Telemetry initialization error: {0}")]
19 InitializationError(String),
20
21 #[error("Metrics export error: {0}")]
22 ExportError(String),
23
24 #[error(transparent)]
25 OpenTelemetryError(#[from] opentelemetry::metrics::MetricsError),
26}
27
28pub type Result<T> = std::result::Result<T, TelemetryError>;
29
30pub struct TelemetryMetrics {
32 meter: Meter,
34 event_counter: Counter<u64>,
36 processing_latency: Histogram<f64>,
38 active_collectors: UpDownCounter<i64>,
40 batch_size: Histogram<u64>,
42 error_counter: Counter<u64>,
44 kafka_publish_latency: Histogram<f64>,
46 kafka_errors: Counter<u64>,
48 rate_limit_checks: Counter<u64>,
50 rate_limit_usage: UpDownCounter<i64>,
52 rate_limit_active_sources: UpDownCounter<i64>,
54}
55
56impl TelemetryMetrics {
57 pub fn new(meter: Meter) -> Result<Self> {
59 let event_counter = meter
60 .u64_counter("feedback_events_total")
61 .with_description("Total number of feedback events received")
62 .with_unit("events")
63 .init();
64
65 let processing_latency = meter
66 .f64_histogram("feedback_processing_duration_ms")
67 .with_description("Time taken to process feedback events")
68 .with_unit("ms")
69 .init();
70
71 let active_collectors = meter
72 .i64_up_down_counter("active_feedback_collectors")
73 .with_description("Number of active feedback collectors")
74 .init();
75
76 let batch_size = meter
77 .u64_histogram("feedback_batch_size")
78 .with_description("Number of events in each batch")
79 .with_unit("events")
80 .init();
81
82 let error_counter = meter
83 .u64_counter("feedback_errors_total")
84 .with_description("Total number of feedback processing errors")
85 .with_unit("errors")
86 .init();
87
88 let kafka_publish_latency = meter
89 .f64_histogram("kafka_publish_duration_ms")
90 .with_description("Time taken to publish to Kafka")
91 .with_unit("ms")
92 .init();
93
94 let kafka_errors = meter
95 .u64_counter("kafka_errors_total")
96 .with_description("Total number of Kafka publish errors")
97 .with_unit("errors")
98 .init();
99
100 let rate_limit_checks = meter
101 .u64_counter("rate_limit_checks_total")
102 .with_description("Total number of rate limit checks")
103 .with_unit("checks")
104 .init();
105
106 let rate_limit_usage = meter
107 .i64_up_down_counter("rate_limit_current_usage")
108 .with_description("Current rate limit usage")
109 .init();
110
111 let rate_limit_active_sources = meter
112 .i64_up_down_counter("rate_limit_active_sources")
113 .with_description("Number of active sources being tracked")
114 .init();
115
116 Ok(Self {
117 meter,
118 event_counter,
119 processing_latency,
120 active_collectors,
121 batch_size,
122 error_counter,
123 kafka_publish_latency,
124 kafka_errors,
125 rate_limit_checks,
126 rate_limit_usage,
127 rate_limit_active_sources,
128 })
129 }
130
131 pub fn record_event(&self, event: &FeedbackEvent) {
133 let event_type = event.event_type();
134 self.event_counter.add(1, &[KeyValue::new("event_type", event_type)]);
135 }
136
137 pub fn record_processing_latency(&self, duration_ms: f64, event_type: &str) {
139 self.processing_latency.record(
140 duration_ms,
141 &[KeyValue::new("event_type", event_type.to_string())],
142 );
143 }
144
145 pub fn increment_active_collectors(&self) {
147 self.active_collectors.add(1, &[]);
148 }
149
150 pub fn decrement_active_collectors(&self) {
152 self.active_collectors.add(-1, &[]);
153 }
154
155 pub fn record_batch_size(&self, size: u64) {
157 self.batch_size.record(size, &[]);
158 }
159
160 pub fn record_error(&self, error_type: &str) {
162 self.error_counter.add(1, &[KeyValue::new("error_type", error_type.to_string())]);
163 }
164
165 pub fn record_kafka_publish_latency(&self, duration_ms: f64, success: bool) {
167 self.kafka_publish_latency.record(
168 duration_ms,
169 &[KeyValue::new("success", success.to_string())],
170 );
171 }
172
173 pub fn record_kafka_error(&self, error_type: &str) {
175 self.kafka_errors.add(1, &[KeyValue::new("error_type", error_type.to_string())]);
176 }
177
178 pub fn record_rate_limit_check(&self, allowed: bool) {
180 let status = if allowed { "allowed" } else { "denied" };
181 self.rate_limit_checks.add(1, &[KeyValue::new("status", status.to_string())]);
182 }
183
184 pub fn update_rate_limit_usage(&self, delta: i64) {
186 self.rate_limit_usage.add(delta, &[]);
187 }
188
189 pub fn update_rate_limit_sources(&self, count: i64) {
191 self.rate_limit_active_sources.add(count, &[]);
192 }
193}
194
195#[derive(Debug, Clone)]
197pub struct TelemetryConfig {
198 pub service_name: String,
200 pub service_version: String,
202 pub export_interval_secs: u64,
204 pub otlp_endpoint: Option<String>,
206}
207
208impl Default for TelemetryConfig {
209 fn default() -> Self {
210 Self {
211 service_name: "llm-optimizer-collector".to_string(),
212 service_version: env!("CARGO_PKG_VERSION").to_string(),
213 export_interval_secs: 60,
214 otlp_endpoint: None,
215 }
216 }
217}
218
219pub struct TelemetryProvider {
221 meter_provider: SdkMeterProvider,
223 metrics: Arc<TelemetryMetrics>,
225}
226
227impl TelemetryProvider {
228 pub fn init(config: TelemetryConfig) -> Result<Self> {
230 let resource = Resource::new(vec![
232 KeyValue::new("service.name", config.service_name.clone()),
233 KeyValue::new("service.version", config.service_version.clone()),
234 ]);
235
236 let meter_provider = SdkMeterProvider::builder()
241 .with_resource(resource)
242 .build();
243
244 let meter = meter_provider.meter("feedback_collector");
246
247 let metrics = Arc::new(TelemetryMetrics::new(meter)?);
249
250 Ok(Self {
251 meter_provider,
252 metrics,
253 })
254 }
255
256 pub fn metrics(&self) -> Arc<TelemetryMetrics> {
258 self.metrics.clone()
259 }
260
261 pub fn shutdown(self) -> Result<()> {
263 self.meter_provider
264 .shutdown()
265 .map_err(|e| TelemetryError::ExportError(e.to_string()))
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::feedback_events::{UserFeedbackEvent, PerformanceMetricsEvent};
273 use uuid::Uuid;
274
275 #[test]
276 fn test_telemetry_config_default() {
277 let config = TelemetryConfig::default();
278 assert_eq!(config.service_name, "llm-optimizer-collector");
279 assert_eq!(config.export_interval_secs, 60);
280 }
281
282 #[test]
283 fn test_telemetry_provider_init() {
284 let config = TelemetryConfig::default();
285 let provider = TelemetryProvider::init(config).unwrap();
286 let metrics = provider.metrics();
287
288 assert!(Arc::strong_count(&metrics) >= 1);
290
291 provider.shutdown().unwrap();
293 }
294
295 #[test]
296 fn test_record_event() {
297 let config = TelemetryConfig::default();
298 let provider = TelemetryProvider::init(config).unwrap();
299 let metrics = provider.metrics();
300
301 let request_id = Uuid::new_v4();
302 let event = FeedbackEvent::UserFeedback(UserFeedbackEvent::new("session1", request_id));
303
304 metrics.record_event(&event);
306
307 provider.shutdown().unwrap();
308 }
309
310 #[test]
311 fn test_record_metrics() {
312 let config = TelemetryConfig::default();
313 let provider = TelemetryProvider::init(config).unwrap();
314 let metrics = provider.metrics();
315
316 metrics.record_processing_latency(100.0, "user_feedback");
318 metrics.increment_active_collectors();
319 metrics.record_batch_size(50);
320 metrics.record_error("validation_error");
321 metrics.record_kafka_publish_latency(25.0, true);
322 metrics.decrement_active_collectors();
323
324 provider.shutdown().unwrap();
326 }
327
328 #[test]
329 fn test_active_collectors_tracking() {
330 let config = TelemetryConfig::default();
331 let provider = TelemetryProvider::init(config).unwrap();
332 let metrics = provider.metrics();
333
334 metrics.increment_active_collectors();
335 metrics.increment_active_collectors();
336 metrics.decrement_active_collectors();
337
338 provider.shutdown().unwrap();
340 }
341
342 #[test]
343 fn test_kafka_metrics() {
344 let config = TelemetryConfig::default();
345 let provider = TelemetryProvider::init(config).unwrap();
346 let metrics = provider.metrics();
347
348 metrics.record_kafka_publish_latency(50.0, true);
349 metrics.record_kafka_publish_latency(150.0, false);
350 metrics.record_kafka_error("connection_error");
351
352 provider.shutdown().unwrap();
353 }
354}