Skip to main content

do_memory_core/embeddings/
metrics.rs

1//! Performance metrics for embedding providers
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Instant;
5
6/// Performance metrics for an embedding provider
7#[derive(Debug, Default)]
8pub struct ProviderMetrics {
9    // Request counts
10    total_requests: AtomicU64,
11    successful_requests: AtomicU64,
12    failed_requests: AtomicU64,
13    retried_requests: AtomicU64,
14
15    // Processing metrics
16    total_items_embedded: AtomicU64,
17    total_tokens_used: AtomicU64,
18
19    // Latency tracking (stored as sum for average calculation)
20    total_latency_ms: AtomicU64,
21
22    // Circuit breaker metrics
23    circuit_breaker_opens: AtomicU64,
24    circuit_breaker_closes: AtomicU64,
25
26    // Compression metrics
27    bytes_sent_uncompressed: AtomicU64,
28    bytes_sent_compressed: AtomicU64,
29}
30
31/// Snapshot of metrics at a point in time
32#[derive(Debug, Clone)]
33pub struct MetricsSnapshot {
34    pub total_requests: u64,
35    pub successful_requests: u64,
36    pub failed_requests: u64,
37    pub retried_requests: u64,
38    pub total_items_embedded: u64,
39    pub total_tokens_used: u64,
40    pub average_latency_ms: u64,
41    pub circuit_breaker_opens: u64,
42    pub circuit_breaker_closes: u64,
43    pub bytes_sent_uncompressed: u64,
44    pub bytes_sent_compressed: u64,
45}
46
47impl ProviderMetrics {
48    /// Create a new metrics collector
49    #[must_use]
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// Record a request attempt
55    pub fn record_request(&self) {
56        self.total_requests.fetch_add(1, Ordering::Relaxed);
57    }
58
59    /// Record a successful request
60    pub fn record_success(&self, items: u64, tokens: u64, latency_ms: u64) {
61        self.successful_requests.fetch_add(1, Ordering::Relaxed);
62        self.total_items_embedded
63            .fetch_add(items, Ordering::Relaxed);
64        self.total_tokens_used.fetch_add(tokens, Ordering::Relaxed);
65        self.total_latency_ms
66            .fetch_add(latency_ms, Ordering::Relaxed);
67    }
68
69    /// Record a failed request
70    pub fn record_failure(&self) {
71        self.failed_requests.fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Record a retry attempt
75    pub fn record_retry(&self) {
76        self.retried_requests.fetch_add(1, Ordering::Relaxed);
77    }
78
79    /// Record circuit breaker opening
80    pub fn record_circuit_breaker_open(&self) {
81        self.circuit_breaker_opens.fetch_add(1, Ordering::Relaxed);
82    }
83
84    /// Record circuit breaker closing
85    pub fn record_circuit_breaker_close(&self) {
86        self.circuit_breaker_closes.fetch_add(1, Ordering::Relaxed);
87    }
88
89    /// Record compression metrics
90    pub fn record_compression(&self, uncompressed: u64, compressed: u64) {
91        self.bytes_sent_uncompressed
92            .fetch_add(uncompressed, Ordering::Relaxed);
93        self.bytes_sent_compressed
94            .fetch_add(compressed, Ordering::Relaxed);
95    }
96
97    /// Get a snapshot of current metrics
98    #[must_use]
99    pub fn snapshot(&self) -> MetricsSnapshot {
100        let total_requests = self.total_requests.load(Ordering::Relaxed);
101        let successful_requests = self.successful_requests.load(Ordering::Relaxed);
102        let total_latency_ms = self.total_latency_ms.load(Ordering::Relaxed);
103
104        let average_latency_ms = total_latency_ms
105            .checked_div(successful_requests)
106            .unwrap_or(0);
107
108        MetricsSnapshot {
109            total_requests,
110            successful_requests,
111            failed_requests: self.failed_requests.load(Ordering::Relaxed),
112            retried_requests: self.retried_requests.load(Ordering::Relaxed),
113            total_items_embedded: self.total_items_embedded.load(Ordering::Relaxed),
114            total_tokens_used: self.total_tokens_used.load(Ordering::Relaxed),
115            average_latency_ms,
116            circuit_breaker_opens: self.circuit_breaker_opens.load(Ordering::Relaxed),
117            circuit_breaker_closes: self.circuit_breaker_closes.load(Ordering::Relaxed),
118            bytes_sent_uncompressed: self.bytes_sent_uncompressed.load(Ordering::Relaxed),
119            bytes_sent_compressed: self.bytes_sent_compressed.load(Ordering::Relaxed),
120        }
121    }
122
123    /// Reset all metrics to zero
124    pub fn reset(&self) {
125        self.total_requests.store(0, Ordering::Relaxed);
126        self.successful_requests.store(0, Ordering::Relaxed);
127        self.failed_requests.store(0, Ordering::Relaxed);
128        self.retried_requests.store(0, Ordering::Relaxed);
129        self.total_items_embedded.store(0, Ordering::Relaxed);
130        self.total_tokens_used.store(0, Ordering::Relaxed);
131        self.total_latency_ms.store(0, Ordering::Relaxed);
132        self.circuit_breaker_opens.store(0, Ordering::Relaxed);
133        self.circuit_breaker_closes.store(0, Ordering::Relaxed);
134        self.bytes_sent_uncompressed.store(0, Ordering::Relaxed);
135        self.bytes_sent_compressed.store(0, Ordering::Relaxed);
136    }
137}
138
139impl MetricsSnapshot {
140    /// Calculate success rate as a percentage
141    #[must_use]
142    pub fn success_rate(&self) -> f64 {
143        if self.total_requests == 0 {
144            return 0.0;
145        }
146        (self.successful_requests as f64 / self.total_requests as f64) * 100.0
147    }
148
149    /// Calculate failure rate as a percentage
150    #[must_use]
151    pub fn failure_rate(&self) -> f64 {
152        if self.total_requests == 0 {
153            return 0.0;
154        }
155        (self.failed_requests as f64 / self.total_requests as f64) * 100.0
156    }
157
158    /// Calculate retry rate as a percentage
159    #[must_use]
160    pub fn retry_rate(&self) -> f64 {
161        if self.total_requests == 0 {
162            return 0.0;
163        }
164        (self.retried_requests as f64 / self.total_requests as f64) * 100.0
165    }
166
167    /// Calculate compression ratio as a percentage
168    #[must_use]
169    pub fn compression_ratio(&self) -> f64 {
170        if self.bytes_sent_uncompressed == 0 {
171            return 0.0;
172        }
173        (self.bytes_sent_compressed as f64 / self.bytes_sent_uncompressed as f64) * 100.0
174    }
175
176    /// Calculate bandwidth saved by compression
177    #[must_use]
178    pub fn bytes_saved(&self) -> u64 {
179        self.bytes_sent_uncompressed
180            .saturating_sub(self.bytes_sent_compressed)
181    }
182
183    /// Estimate cost based on token usage (`OpenAI` pricing)
184    ///
185    /// # Arguments
186    /// * `cost_per_million_tokens` - Cost per 1M tokens (e.g., 0.02 for text-embedding-3-small)
187    #[must_use]
188    pub fn estimated_cost(&self, cost_per_million_tokens: f64) -> f64 {
189        (self.total_tokens_used as f64 / 1_000_000.0) * cost_per_million_tokens
190    }
191
192    /// Calculate average items per request
193    #[must_use]
194    pub fn average_batch_size(&self) -> f64 {
195        if self.successful_requests == 0 {
196            return 0.0;
197        }
198        self.total_items_embedded as f64 / self.successful_requests as f64
199    }
200
201    /// Format metrics as a human-readable string
202    #[must_use]
203    pub fn format(&self) -> String {
204        format!(
205            r"Provider Metrics:
206  Requests:        {} total ({} success, {} failed)
207  Success Rate:    {:.2}%
208  Retry Rate:      {:.2}%
209  Items Embedded:  {}
210  Tokens Used:     {}
211  Avg Latency:     {}ms
212  Avg Batch Size:  {:.1} items
213  Circuit Breaker: {} opens, {} closes
214  Compression:     {:.1}% ratio ({} bytes saved)",
215            self.total_requests,
216            self.successful_requests,
217            self.failed_requests,
218            self.success_rate(),
219            self.retry_rate(),
220            self.total_items_embedded,
221            self.total_tokens_used,
222            self.average_latency_ms,
223            self.average_batch_size(),
224            self.circuit_breaker_opens,
225            self.circuit_breaker_closes,
226            self.compression_ratio(),
227            self.bytes_saved()
228        )
229    }
230}
231
232/// Helper to measure latency
233pub struct LatencyTimer {
234    start: Instant,
235}
236
237impl LatencyTimer {
238    /// Start a new latency timer
239    #[must_use]
240    pub fn start() -> Self {
241        Self {
242            start: Instant::now(),
243        }
244    }
245
246    /// Get elapsed time in milliseconds
247    #[must_use]
248    pub fn elapsed_ms(&self) -> u64 {
249        self.start.elapsed().as_millis() as u64
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    #[test]
258    fn test_metrics_recording() {
259        let metrics = ProviderMetrics::new();
260
261        metrics.record_request();
262        metrics.record_success(10, 1000, 100);
263
264        let snapshot = metrics.snapshot();
265        assert_eq!(snapshot.total_requests, 1);
266        assert_eq!(snapshot.successful_requests, 1);
267        assert_eq!(snapshot.total_items_embedded, 10);
268        assert_eq!(snapshot.total_tokens_used, 1000);
269        assert_eq!(snapshot.average_latency_ms, 100);
270    }
271
272    #[test]
273    fn test_success_rate() {
274        let metrics = ProviderMetrics::new();
275
276        metrics.record_request();
277        metrics.record_success(1, 100, 50);
278        metrics.record_request();
279        metrics.record_success(1, 100, 50);
280        metrics.record_request();
281        metrics.record_failure();
282
283        let snapshot = metrics.snapshot();
284        assert_eq!(snapshot.success_rate(), 66.666_666_666_666_66);
285        assert_eq!(snapshot.failure_rate(), 33.333_333_333_333_33);
286    }
287
288    #[test]
289    fn test_compression_metrics() {
290        let metrics = ProviderMetrics::new();
291
292        metrics.record_compression(1000, 300);
293        metrics.record_compression(2000, 600);
294
295        let snapshot = metrics.snapshot();
296        assert_eq!(snapshot.bytes_sent_uncompressed, 3000);
297        assert_eq!(snapshot.bytes_sent_compressed, 900);
298        assert_eq!(snapshot.compression_ratio(), 30.0);
299        assert_eq!(snapshot.bytes_saved(), 2100);
300    }
301
302    #[test]
303    fn test_estimated_cost() {
304        let metrics = ProviderMetrics::new();
305
306        metrics.record_request();
307        metrics.record_success(100, 10_000, 100);
308
309        let snapshot = metrics.snapshot();
310        assert_eq!(snapshot.estimated_cost(0.02), 0.0002); // $0.02 per 1M tokens
311    }
312
313    #[test]
314    fn test_average_batch_size() {
315        let metrics = ProviderMetrics::new();
316
317        metrics.record_request();
318        metrics.record_success(100, 1000, 50);
319        metrics.record_request();
320        metrics.record_success(200, 2000, 60);
321
322        let snapshot = metrics.snapshot();
323        assert_eq!(snapshot.average_batch_size(), 150.0);
324    }
325
326    #[test]
327    fn test_metrics_reset() {
328        let metrics = ProviderMetrics::new();
329
330        metrics.record_request();
331        metrics.record_success(10, 1000, 100);
332        metrics.reset();
333
334        let snapshot = metrics.snapshot();
335        assert_eq!(snapshot.total_requests, 0);
336        assert_eq!(snapshot.successful_requests, 0);
337    }
338
339    #[test]
340    fn test_latency_timer() {
341        let timer = LatencyTimer::start();
342        std::thread::sleep(std::time::Duration::from_millis(10));
343        let elapsed = timer.elapsed_ms();
344        assert!(elapsed >= 10);
345    }
346}