Skip to main content

do_memory_core/embeddings/
metrics.rs

1//! Performance metrics for embedding providers
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Instant;
5
6/// Performance metrics for an embedding provider
7#[derive(Debug, Default)]
8pub struct ProviderMetrics {
9    // Request counts
10    total_requests: AtomicU64,
11    successful_requests: AtomicU64,
12    failed_requests: AtomicU64,
13    retried_requests: AtomicU64,
14
15    // Processing metrics
16    total_items_embedded: AtomicU64,
17    total_tokens_used: AtomicU64,
18
19    // Latency tracking (stored as sum for average calculation)
20    total_latency_ms: AtomicU64,
21
22    // Circuit breaker metrics
23    circuit_breaker_opens: AtomicU64,
24    circuit_breaker_closes: AtomicU64,
25
26    // Compression metrics
27    bytes_sent_uncompressed: AtomicU64,
28    bytes_sent_compressed: AtomicU64,
29}
30
31/// Snapshot of metrics at a point in time
32#[derive(Debug, Clone)]
33pub struct MetricsSnapshot {
34    pub total_requests: u64,
35    pub successful_requests: u64,
36    pub failed_requests: u64,
37    pub retried_requests: u64,
38    pub total_items_embedded: u64,
39    pub total_tokens_used: u64,
40    pub average_latency_ms: u64,
41    pub circuit_breaker_opens: u64,
42    pub circuit_breaker_closes: u64,
43    pub bytes_sent_uncompressed: u64,
44    pub bytes_sent_compressed: u64,
45}
46
47impl ProviderMetrics {
48    /// Create a new metrics collector
49    #[must_use]
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// Record a request attempt
55    pub fn record_request(&self) {
56        self.total_requests.fetch_add(1, Ordering::Relaxed);
57    }
58
59    /// Record a successful request
60    pub fn record_success(&self, items: u64, tokens: u64, latency_ms: u64) {
61        self.successful_requests.fetch_add(1, Ordering::Relaxed);
62        self.total_items_embedded
63            .fetch_add(items, Ordering::Relaxed);
64        self.total_tokens_used.fetch_add(tokens, Ordering::Relaxed);
65        self.total_latency_ms
66            .fetch_add(latency_ms, Ordering::Relaxed);
67    }
68
69    /// Record a failed request
70    pub fn record_failure(&self) {
71        self.failed_requests.fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Record a retry attempt
75    pub fn record_retry(&self) {
76        self.retried_requests.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Record circuit breaker opening
80    pub fn record_circuit_breaker_open(&self) {
81        self.circuit_breaker_opens.fetch_add(1, Ordering::Relaxed);
82    }
83
84    /// Record circuit breaker closing
85    pub fn record_circuit_breaker_close(&self) {
86        self.circuit_breaker_closes.fetch_add(1, Ordering::Relaxed);
87    }
88
89    /// Record compression metrics
90    pub fn record_compression(&self, uncompressed: u64, compressed: u64) {
91        self.bytes_sent_uncompressed
92            .fetch_add(uncompressed, Ordering::Relaxed);
93        self.bytes_sent_compressed
94            .fetch_add(compressed, Ordering::Relaxed);
95    }
96
97    /// Get a snapshot of current metrics
98    #[must_use]
99    pub fn snapshot(&self) -> MetricsSnapshot {
100        let total_requests = self.total_requests.load(Ordering::Relaxed);
101        let successful_requests = self.successful_requests.load(Ordering::Relaxed);
102        let total_latency_ms = self.total_latency_ms.load(Ordering::Relaxed);
103
104        let average_latency_ms = if successful_requests > 0 {
105            total_latency_ms / successful_requests
106        } else {
107            0
108        };
109
110        MetricsSnapshot {
111            total_requests,
112            successful_requests,
113            failed_requests: self.failed_requests.load(Ordering::Relaxed),
114            retried_requests: self.retried_requests.load(Ordering::Relaxed),
115            total_items_embedded: self.total_items_embedded.load(Ordering::Relaxed),
116            total_tokens_used: self.total_tokens_used.load(Ordering::Relaxed),
117            average_latency_ms,
118            circuit_breaker_opens: self.circuit_breaker_opens.load(Ordering::Relaxed),
119            circuit_breaker_closes: self.circuit_breaker_closes.load(Ordering::Relaxed),
120            bytes_sent_uncompressed: self.bytes_sent_uncompressed.load(Ordering::Relaxed),
121            bytes_sent_compressed: self.bytes_sent_compressed.load(Ordering::Relaxed),
122        }
123    }
124
125    /// Reset all metrics to zero
126    pub fn reset(&self) {
127        self.total_requests.store(0, Ordering::Relaxed);
128        self.successful_requests.store(0, Ordering::Relaxed);
129        self.failed_requests.store(0, Ordering::Relaxed);
130        self.retried_requests.store(0, Ordering::Relaxed);
131        self.total_items_embedded.store(0, Ordering::Relaxed);
132        self.total_tokens_used.store(0, Ordering::Relaxed);
133        self.total_latency_ms.store(0, Ordering::Relaxed);
134        self.circuit_breaker_opens.store(0, Ordering::Relaxed);
135        self.circuit_breaker_closes.store(0, Ordering::Relaxed);
136        self.bytes_sent_uncompressed.store(0, Ordering::Relaxed);
137        self.bytes_sent_compressed.store(0, Ordering::Relaxed);
138    }
139}
140
141impl MetricsSnapshot {
142    /// Calculate success rate as a percentage
143    #[must_use]
144    pub fn success_rate(&self) -> f64 {
145        if self.total_requests == 0 {
146            return 0.0;
147        }
148        (self.successful_requests as f64 / self.total_requests as f64) * 100.0
149    }
150
151    /// Calculate failure rate as a percentage
152    #[must_use]
153    pub fn failure_rate(&self) -> f64 {
154        if self.total_requests == 0 {
155            return 0.0;
156        }
157        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
158    }
159
160    /// Calculate retry rate as a percentage
161    #[must_use]
162    pub fn retry_rate(&self) -> f64 {
163        if self.total_requests == 0 {
164            return 0.0;
165        }
166        (self.retried_requests as f64 / self.total_requests as f64) * 100.0
167    }
168
169    /// Calculate compression ratio as a percentage
170    #[must_use]
171    pub fn compression_ratio(&self) -> f64 {
172        if self.bytes_sent_uncompressed == 0 {
173            return 0.0;
174        }
175        (self.bytes_sent_compressed as f64 / self.bytes_sent_uncompressed as f64) * 100.0
176    }
177
178    /// Calculate bandwidth saved by compression
179    #[must_use]
180    pub fn bytes_saved(&self) -> u64 {
181        self.bytes_sent_uncompressed
182            .saturating_sub(self.bytes_sent_compressed)
183    }
184
185    /// Estimate cost based on token usage (`OpenAI` pricing)
186    ///
187    /// # Arguments
188    /// * `cost_per_million_tokens` - Cost per 1M tokens (e.g., 0.02 for text-embedding-3-small)
189    #[must_use]
190    pub fn estimated_cost(&self, cost_per_million_tokens: f64) -> f64 {
191        (self.total_tokens_used as f64 / 1_000_000.0) * cost_per_million_tokens
192    }
193
194    /// Calculate average items per request
195    #[must_use]
196    pub fn average_batch_size(&self) -> f64 {
197        if self.successful_requests == 0 {
198            return 0.0;
199        }
200        self.total_items_embedded as f64 / self.successful_requests as f64
201    }
202
203    /// Format metrics as a human-readable string
204    #[must_use]
205    pub fn format(&self) -> String {
206        format!(
207            r"Provider Metrics:
208  Requests:        {} total ({} success, {} failed)
209  Success Rate:    {:.2}%
210  Retry Rate:      {:.2}%
211  Items Embedded:  {}
212  Tokens Used:     {}
213  Avg Latency:     {}ms
214  Avg Batch Size:  {:.1} items
215  Circuit Breaker: {} opens, {} closes
216  Compression:     {:.1}% ratio ({} bytes saved)",
217            self.total_requests,
218            self.successful_requests,
219            self.failed_requests,
220            self.success_rate(),
221            self.retry_rate(),
222            self.total_items_embedded,
223            self.total_tokens_used,
224            self.average_latency_ms,
225            self.average_batch_size(),
226            self.circuit_breaker_opens,
227            self.circuit_breaker_closes,
228            self.compression_ratio(),
229            self.bytes_saved()
230        )
231    }
232}
233
234/// Helper to measure latency
235pub struct LatencyTimer {
236    start: Instant,
237}
238
239impl LatencyTimer {
240    /// Start a new latency timer
241    #[must_use]
242    pub fn start() -> Self {
243        Self {
244            start: Instant::now(),
245        }
246    }
247
248    /// Get elapsed time in milliseconds
249    #[must_use]
250    pub fn elapsed_ms(&self) -> u64 {
251        self.start.elapsed().as_millis() as u64
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_metrics_recording() {
261        let metrics = ProviderMetrics::new();
262
263        metrics.record_request();
264        metrics.record_success(10, 1000, 100);
265
266        let snapshot = metrics.snapshot();
267        assert_eq!(snapshot.total_requests, 1);
268        assert_eq!(snapshot.successful_requests, 1);
269        assert_eq!(snapshot.total_items_embedded, 10);
270        assert_eq!(snapshot.total_tokens_used, 1000);
271        assert_eq!(snapshot.average_latency_ms, 100);
272    }
273
274    #[test]
275    fn test_success_rate() {
276        let metrics = ProviderMetrics::new();
277
278        metrics.record_request();
279        metrics.record_success(1, 100, 50);
280        metrics.record_request();
281        metrics.record_success(1, 100, 50);
282        metrics.record_request();
283        metrics.record_failure();
284
285        let snapshot = metrics.snapshot();
286        assert_eq!(snapshot.success_rate(), 66.666_666_666_666_66);
287        assert_eq!(snapshot.failure_rate(), 33.333_333_333_333_33);
288    }
289
290    #[test]
291    fn test_compression_metrics() {
292        let metrics = ProviderMetrics::new();
293
294        metrics.record_compression(1000, 300);
295        metrics.record_compression(2000, 600);
296
297        let snapshot = metrics.snapshot();
298        assert_eq!(snapshot.bytes_sent_uncompressed, 3000);
299        assert_eq!(snapshot.bytes_sent_compressed, 900);
300        assert_eq!(snapshot.compression_ratio(), 30.0);
301        assert_eq!(snapshot.bytes_saved(), 2100);
302    }
303
304    #[test]
305    fn test_estimated_cost() {
306        let metrics = ProviderMetrics::new();
307
308        metrics.record_request();
309        metrics.record_success(100, 10_000, 100);
310
311        let snapshot = metrics.snapshot();
312        assert_eq!(snapshot.estimated_cost(0.02), 0.0002); // $0.02 per 1M tokens
313    }
314
315    #[test]
316    fn test_average_batch_size() {
317        let metrics = ProviderMetrics::new();
318
319        metrics.record_request();
320        metrics.record_success(100, 1000, 50);
321        metrics.record_request();
322        metrics.record_success(200, 2000, 60);
323
324        let snapshot = metrics.snapshot();
325        assert_eq!(snapshot.average_batch_size(), 150.0);
326    }
327
328    #[test]
329    fn test_metrics_reset() {
330        let metrics = ProviderMetrics::new();
331
332        metrics.record_request();
333        metrics.record_success(10, 1000, 100);
334        metrics.reset();
335
336        let snapshot = metrics.snapshot();
337        assert_eq!(snapshot.total_requests, 0);
338        assert_eq!(snapshot.successful_requests, 0);
339    }
340
341    #[test]
342    fn test_latency_timer() {
343        let timer = LatencyTimer::start();
344        std::thread::sleep(std::time::Duration::from_millis(10));
345        let elapsed = timer.elapsed_ms();
346        assert!(elapsed >= 10);
347    }
348}