Skip to main content

rusmes_storage/
metrics.rs

1//! Storage metrics collection and tracking
2//!
3//! This module provides comprehensive metrics tracking for storage operations including:
4//! - Disk usage tracking (per user, per mailbox, total)
5//! - Message counts (total, per mailbox, per state)
6//! - Operation latency histograms (append, fetch, delete, search)
7//! - Storage backend health metrics
8//! - Integration with Prometheus metrics
9//!
10//! ## Usage
11//!
12//! ```rust,no_run
13//! use rusmes_storage::metrics::StorageMetrics;
14//!
15//! # async fn example() -> anyhow::Result<()> {
16//! let metrics = StorageMetrics::new();
17//!
18//! // Track message operations
19//! metrics.inc_messages_total(1);
20//! metrics.add_disk_usage_bytes(1024);
21//!
22//! // Track operation latency
23//! let timer = metrics.start_append_timer();
24//! // ... perform append operation ...
25//! timer.observe();
26//!
27//! // Export to Prometheus format
28//! let prometheus_output = metrics.export_prometheus();
29//! # Ok(())
30//! # }
31//! ```
32
33use std::collections::HashMap;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{Arc, RwLock};
36use std::time::Instant;
37
38/// Histogram bucket for tracking latency distributions
39#[derive(Debug, Clone)]
40pub struct Histogram {
41    buckets: Vec<f64>,
42    counts: Vec<Arc<AtomicU64>>,
43    sum: Arc<AtomicU64>,
44    count: Arc<AtomicU64>,
45}
46
47impl Histogram {
48    /// Create a new histogram with specified bucket boundaries (in seconds)
49    pub fn new(buckets: Vec<f64>) -> Self {
50        let counts = buckets
51            .iter()
52            .map(|_| Arc::new(AtomicU64::new(0)))
53            .collect();
54        Self {
55            buckets,
56            counts,
57            sum: Arc::new(AtomicU64::new(0)),
58            count: Arc::new(AtomicU64::new(0)),
59        }
60    }
61
62    /// Observe a value (in seconds)
63    pub fn observe(&self, value: f64) {
64        let millis = (value * 1000.0) as u64;
65        self.sum.fetch_add(millis, Ordering::Relaxed);
66        self.count.fetch_add(1, Ordering::Relaxed);
67
68        for (i, &bucket) in self.buckets.iter().enumerate() {
69            if value <= bucket {
70                self.counts[i].fetch_add(1, Ordering::Relaxed);
71            }
72        }
73    }
74
75    /// Export histogram in Prometheus format
76    pub fn export(&self, name: &str, help: &str) -> String {
77        let mut output = String::new();
78        output.push_str(&format!("# HELP {} {}\n", name, help));
79        output.push_str(&format!("# TYPE {} histogram\n", name));
80
81        for (i, &bucket) in self.buckets.iter().enumerate() {
82            let count = self.counts[i].load(Ordering::Relaxed);
83            output.push_str(&format!("{}_bucket{{le=\"{}\"}} {}\n", name, bucket, count));
84        }
85
86        output.push_str(&format!(
87            "{}_bucket{{le=\"+Inf\"}} {}\n",
88            name,
89            self.count.load(Ordering::Relaxed)
90        ));
91        output.push_str(&format!(
92            "{}_sum {}\n",
93            name,
94            self.sum.load(Ordering::Relaxed) as f64 / 1000.0
95        ));
96        output.push_str(&format!(
97            "{}_count {}\n",
98            name,
99            self.count.load(Ordering::Relaxed)
100        ));
101
102        output
103    }
104
105    /// Get total count of observations
106    pub fn get_count(&self) -> u64 {
107        self.count.load(Ordering::Relaxed)
108    }
109
110    /// Get sum of all observations (in milliseconds)
111    pub fn get_sum(&self) -> u64 {
112        self.sum.load(Ordering::Relaxed)
113    }
114
115    /// Calculate average (in seconds)
116    pub fn average(&self) -> f64 {
117        let count = self.get_count();
118        if count == 0 {
119            0.0
120        } else {
121            (self.get_sum() as f64 / 1000.0) / count as f64
122        }
123    }
124}
125
126/// Timer for tracking operation duration
127pub struct StorageTimer {
128    start: Instant,
129    histogram: Arc<Histogram>,
130}
131
132impl StorageTimer {
133    fn new(histogram: Arc<Histogram>) -> Self {
134        Self {
135            start: Instant::now(),
136            histogram,
137        }
138    }
139
140    /// Observe and record the elapsed time
141    pub fn observe(self) {
142        let duration = self.start.elapsed().as_secs_f64();
143        self.histogram.observe(duration);
144    }
145
146    /// Get elapsed time without recording
147    pub fn elapsed(&self) -> f64 {
148        self.start.elapsed().as_secs_f64()
149    }
150}
151
152/// Storage metrics collector
153#[derive(Debug, Clone)]
154pub struct StorageMetrics {
155    // Message counts
156    messages_total: Arc<AtomicU64>,
157    messages_deleted: Arc<AtomicU64>,
158    messages_flagged: Arc<AtomicU64>,
159    messages_seen: Arc<AtomicU64>,
160    messages_unseen: Arc<AtomicU64>,
161
162    // Mailbox counts
163    mailboxes_total: Arc<AtomicU64>,
164    mailboxes_created: Arc<AtomicU64>,
165    mailboxes_deleted: Arc<AtomicU64>,
166
167    // Disk usage (in bytes)
168    disk_usage_total_bytes: Arc<AtomicU64>,
169
170    // Per-user disk usage (in bytes)
171    disk_usage_per_user: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
172
173    // Per-mailbox message counts
174    messages_per_mailbox: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
175
176    // Operation counters
177    append_operations_total: Arc<AtomicU64>,
178    fetch_operations_total: Arc<AtomicU64>,
179    delete_operations_total: Arc<AtomicU64>,
180    search_operations_total: Arc<AtomicU64>,
181    copy_operations_total: Arc<AtomicU64>,
182
183    // Operation error counters
184    append_errors_total: Arc<AtomicU64>,
185    fetch_errors_total: Arc<AtomicU64>,
186    delete_errors_total: Arc<AtomicU64>,
187    search_errors_total: Arc<AtomicU64>,
188
189    // Backend health
190    backend_healthy: Arc<AtomicU64>,
191    backend_last_check: Arc<AtomicU64>,
192
193    // Operation latency histograms
194    append_latency: Arc<Histogram>,
195    fetch_latency: Arc<Histogram>,
196    delete_latency: Arc<Histogram>,
197    search_latency: Arc<Histogram>,
198}
199
200impl Default for StorageMetrics {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl StorageMetrics {
207    /// Create a new storage metrics collector
208    pub fn new() -> Self {
209        // Define histogram buckets for operation latency (in seconds)
210        // Optimized for storage operations: 1ms to 10s
211        let latency_buckets = vec![
212            0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
213        ];
214
215        Self {
216            messages_total: Arc::new(AtomicU64::new(0)),
217            messages_deleted: Arc::new(AtomicU64::new(0)),
218            messages_flagged: Arc::new(AtomicU64::new(0)),
219            messages_seen: Arc::new(AtomicU64::new(0)),
220            messages_unseen: Arc::new(AtomicU64::new(0)),
221            mailboxes_total: Arc::new(AtomicU64::new(0)),
222            mailboxes_created: Arc::new(AtomicU64::new(0)),
223            mailboxes_deleted: Arc::new(AtomicU64::new(0)),
224            disk_usage_total_bytes: Arc::new(AtomicU64::new(0)),
225            disk_usage_per_user: Arc::new(RwLock::new(HashMap::new())),
226            messages_per_mailbox: Arc::new(RwLock::new(HashMap::new())),
227            append_operations_total: Arc::new(AtomicU64::new(0)),
228            fetch_operations_total: Arc::new(AtomicU64::new(0)),
229            delete_operations_total: Arc::new(AtomicU64::new(0)),
230            search_operations_total: Arc::new(AtomicU64::new(0)),
231            copy_operations_total: Arc::new(AtomicU64::new(0)),
232            append_errors_total: Arc::new(AtomicU64::new(0)),
233            fetch_errors_total: Arc::new(AtomicU64::new(0)),
234            delete_errors_total: Arc::new(AtomicU64::new(0)),
235            search_errors_total: Arc::new(AtomicU64::new(0)),
236            backend_healthy: Arc::new(AtomicU64::new(1)),
237            backend_last_check: Arc::new(AtomicU64::new(0)),
238            append_latency: Arc::new(Histogram::new(latency_buckets.clone())),
239            fetch_latency: Arc::new(Histogram::new(latency_buckets.clone())),
240            delete_latency: Arc::new(Histogram::new(latency_buckets.clone())),
241            search_latency: Arc::new(Histogram::new(latency_buckets)),
242        }
243    }
244
245    // Message count metrics
246
247    /// Increment total message count
248    pub fn inc_messages_total(&self, count: u64) {
249        self.messages_total.fetch_add(count, Ordering::Relaxed);
250    }
251
252    /// Decrement total message count
253    pub fn dec_messages_total(&self, count: u64) {
254        self.messages_total.fetch_sub(count, Ordering::Relaxed);
255    }
256
257    /// Set total message count
258    pub fn set_messages_total(&self, count: u64) {
259        self.messages_total.store(count, Ordering::Relaxed);
260    }
261
262    /// Get total message count
263    pub fn get_messages_total(&self) -> u64 {
264        self.messages_total.load(Ordering::Relaxed)
265    }
266
267    /// Increment deleted message count
268    pub fn inc_messages_deleted(&self, count: u64) {
269        self.messages_deleted.fetch_add(count, Ordering::Relaxed);
270    }
271
272    /// Increment flagged message count
273    pub fn inc_messages_flagged(&self) {
274        self.messages_flagged.fetch_add(1, Ordering::Relaxed);
275    }
276
277    /// Decrement flagged message count
278    pub fn dec_messages_flagged(&self) {
279        self.messages_flagged.fetch_sub(1, Ordering::Relaxed);
280    }
281
282    /// Increment seen message count
283    pub fn inc_messages_seen(&self) {
284        self.messages_seen.fetch_add(1, Ordering::Relaxed);
285    }
286
287    /// Decrement seen message count
288    pub fn dec_messages_seen(&self) {
289        self.messages_seen.fetch_sub(1, Ordering::Relaxed);
290    }
291
292    /// Increment unseen message count
293    pub fn inc_messages_unseen(&self) {
294        self.messages_unseen.fetch_add(1, Ordering::Relaxed);
295    }
296
297    /// Decrement unseen message count
298    pub fn dec_messages_unseen(&self) {
299        self.messages_unseen.fetch_sub(1, Ordering::Relaxed);
300    }
301
302    // Mailbox count metrics
303
304    /// Increment mailbox count
305    pub fn inc_mailboxes_total(&self) {
306        self.mailboxes_total.fetch_add(1, Ordering::Relaxed);
307    }
308
309    /// Decrement mailbox count
310    pub fn dec_mailboxes_total(&self) {
311        self.mailboxes_total.fetch_sub(1, Ordering::Relaxed);
312    }
313
314    /// Set mailbox count
315    pub fn set_mailboxes_total(&self, count: u64) {
316        self.mailboxes_total.store(count, Ordering::Relaxed);
317    }
318
319    /// Increment mailbox created counter
320    pub fn inc_mailboxes_created(&self) {
321        self.mailboxes_created.fetch_add(1, Ordering::Relaxed);
322        self.inc_mailboxes_total();
323    }
324
325    /// Increment mailbox deleted counter
326    pub fn inc_mailboxes_deleted(&self) {
327        self.mailboxes_deleted.fetch_add(1, Ordering::Relaxed);
328        self.dec_mailboxes_total();
329    }
330
331    // Disk usage metrics
332
333    /// Add to total disk usage
334    pub fn add_disk_usage_bytes(&self, bytes: u64) {
335        self.disk_usage_total_bytes
336            .fetch_add(bytes, Ordering::Relaxed);
337    }
338
339    /// Subtract from total disk usage
340    pub fn sub_disk_usage_bytes(&self, bytes: u64) {
341        self.disk_usage_total_bytes
342            .fetch_sub(bytes, Ordering::Relaxed);
343    }
344
345    /// Set total disk usage
346    pub fn set_disk_usage_bytes(&self, bytes: u64) {
347        self.disk_usage_total_bytes.store(bytes, Ordering::Relaxed);
348    }
349
350    /// Get total disk usage
351    pub fn get_disk_usage_bytes(&self) -> u64 {
352        self.disk_usage_total_bytes.load(Ordering::Relaxed)
353    }
354
355    /// Set disk usage for a specific user
356    pub fn set_user_disk_usage(&self, user: &str, bytes: u64) {
357        if let Ok(mut map) = self.disk_usage_per_user.write() {
358            map.entry(user.to_string())
359                .or_insert_with(|| Arc::new(AtomicU64::new(0)))
360                .store(bytes, Ordering::Relaxed);
361        }
362    }
363
364    /// Add to disk usage for a specific user
365    pub fn add_user_disk_usage(&self, user: &str, bytes: u64) {
366        let found = if let Ok(map) = self.disk_usage_per_user.read() {
367            if let Some(counter) = map.get(user) {
368                counter.fetch_add(bytes, Ordering::Relaxed);
369                true
370            } else {
371                false
372            }
373        } else {
374            false
375        };
376        if !found {
377            self.set_user_disk_usage(user, bytes);
378        }
379        self.add_disk_usage_bytes(bytes);
380    }
381
382    /// Subtract from disk usage for a specific user
383    pub fn sub_user_disk_usage(&self, user: &str, bytes: u64) {
384        if let Ok(map) = self.disk_usage_per_user.read() {
385            if let Some(counter) = map.get(user) {
386                counter.fetch_sub(bytes, Ordering::Relaxed);
387            }
388        }
389        self.sub_disk_usage_bytes(bytes);
390    }
391
392    /// Get disk usage for a specific user
393    pub fn get_user_disk_usage(&self, user: &str) -> u64 {
394        self.disk_usage_per_user
395            .read()
396            .ok()
397            .and_then(|map| map.get(user).map(|c| c.load(Ordering::Relaxed)))
398            .unwrap_or(0)
399    }
400
401    // Per-mailbox message counts
402
403    /// Set message count for a specific mailbox
404    pub fn set_mailbox_message_count(&self, mailbox_id: &str, count: u64) {
405        if let Ok(mut map) = self.messages_per_mailbox.write() {
406            map.entry(mailbox_id.to_string())
407                .or_insert_with(|| Arc::new(AtomicU64::new(0)))
408                .store(count, Ordering::Relaxed);
409        }
410    }
411
412    /// Increment message count for a specific mailbox
413    pub fn inc_mailbox_message_count(&self, mailbox_id: &str, count: u64) {
414        let found = if let Ok(map) = self.messages_per_mailbox.read() {
415            if let Some(counter) = map.get(mailbox_id) {
416                counter.fetch_add(count, Ordering::Relaxed);
417                true
418            } else {
419                false
420            }
421        } else {
422            false
423        };
424        if !found {
425            self.set_mailbox_message_count(mailbox_id, count);
426        }
427    }
428
429    /// Decrement message count for a specific mailbox
430    pub fn dec_mailbox_message_count(&self, mailbox_id: &str, count: u64) {
431        if let Ok(map) = self.messages_per_mailbox.read() {
432            if let Some(counter) = map.get(mailbox_id) {
433                counter.fetch_sub(count, Ordering::Relaxed);
434            }
435        }
436    }
437
438    /// Get message count for a specific mailbox
439    pub fn get_mailbox_message_count(&self, mailbox_id: &str) -> u64 {
440        self.messages_per_mailbox
441            .read()
442            .ok()
443            .and_then(|map| map.get(mailbox_id).map(|c| c.load(Ordering::Relaxed)))
444            .unwrap_or(0)
445    }
446
447    // Operation counters
448
449    /// Increment append operation counter
450    pub fn inc_append_operations(&self) {
451        self.append_operations_total.fetch_add(1, Ordering::Relaxed);
452    }
453
454    /// Increment fetch operation counter
455    pub fn inc_fetch_operations(&self) {
456        self.fetch_operations_total.fetch_add(1, Ordering::Relaxed);
457    }
458
459    /// Increment delete operation counter
460    pub fn inc_delete_operations(&self) {
461        self.delete_operations_total.fetch_add(1, Ordering::Relaxed);
462    }
463
464    /// Increment search operation counter
465    pub fn inc_search_operations(&self) {
466        self.search_operations_total.fetch_add(1, Ordering::Relaxed);
467    }
468
469    /// Increment copy operation counter
470    pub fn inc_copy_operations(&self) {
471        self.copy_operations_total.fetch_add(1, Ordering::Relaxed);
472    }
473
474    // Error counters
475
476    /// Increment append error counter
477    pub fn inc_append_errors(&self) {
478        self.append_errors_total.fetch_add(1, Ordering::Relaxed);
479    }
480
481    /// Increment fetch error counter
482    pub fn inc_fetch_errors(&self) {
483        self.fetch_errors_total.fetch_add(1, Ordering::Relaxed);
484    }
485
486    /// Increment delete error counter
487    pub fn inc_delete_errors(&self) {
488        self.delete_errors_total.fetch_add(1, Ordering::Relaxed);
489    }
490
491    /// Increment search error counter
492    pub fn inc_search_errors(&self) {
493        self.search_errors_total.fetch_add(1, Ordering::Relaxed);
494    }
495
496    // Backend health metrics
497
498    /// Set backend healthy status (1 = healthy, 0 = unhealthy)
499    pub fn set_backend_healthy(&self, healthy: bool) {
500        self.backend_healthy
501            .store(if healthy { 1 } else { 0 }, Ordering::Relaxed);
502    }
503
504    /// Get backend healthy status
505    pub fn is_backend_healthy(&self) -> bool {
506        self.backend_healthy.load(Ordering::Relaxed) == 1
507    }
508
509    /// Update last health check timestamp (Unix timestamp in seconds)
510    pub fn update_health_check_time(&self, timestamp: u64) {
511        self.backend_last_check.store(timestamp, Ordering::Relaxed);
512    }
513
514    /// Get last health check timestamp
515    pub fn get_last_health_check(&self) -> u64 {
516        self.backend_last_check.load(Ordering::Relaxed)
517    }
518
519    // Latency timing
520
521    /// Start timer for append operation
522    pub fn start_append_timer(&self) -> StorageTimer {
523        StorageTimer::new(Arc::clone(&self.append_latency))
524    }
525
526    /// Start timer for fetch operation
527    pub fn start_fetch_timer(&self) -> StorageTimer {
528        StorageTimer::new(Arc::clone(&self.fetch_latency))
529    }
530
531    /// Start timer for delete operation
532    pub fn start_delete_timer(&self) -> StorageTimer {
533        StorageTimer::new(Arc::clone(&self.delete_latency))
534    }
535
536    /// Start timer for search operation
537    pub fn start_search_timer(&self) -> StorageTimer {
538        StorageTimer::new(Arc::clone(&self.search_latency))
539    }
540
541    // Helper methods for common operations
542
543    /// Record successful append operation with message size
544    pub fn record_append_success(&self, size_bytes: u64) {
545        self.inc_append_operations();
546        self.inc_messages_total(1);
547        self.add_disk_usage_bytes(size_bytes);
548    }
549
550    /// Record failed append operation
551    pub fn record_append_failure(&self) {
552        self.inc_append_errors();
553    }
554
555    /// Record successful delete operation with message size
556    pub fn record_delete_success(&self, size_bytes: u64, count: u64) {
557        self.inc_delete_operations();
558        self.dec_messages_total(count);
559        self.inc_messages_deleted(count);
560        self.sub_disk_usage_bytes(size_bytes);
561    }
562
563    /// Record failed delete operation
564    pub fn record_delete_failure(&self) {
565        self.inc_delete_errors();
566    }
567
568    /// Record successful fetch operation
569    pub fn record_fetch_success(&self) {
570        self.inc_fetch_operations();
571    }
572
573    /// Record failed fetch operation
574    pub fn record_fetch_failure(&self) {
575        self.inc_fetch_errors();
576    }
577
578    /// Record successful search operation
579    pub fn record_search_success(&self) {
580        self.inc_search_operations();
581    }
582
583    /// Record failed search operation
584    pub fn record_search_failure(&self) {
585        self.inc_search_errors();
586    }
587
588    /// Export metrics in Prometheus text format
589    pub fn export_prometheus(&self) -> String {
590        let mut output = String::new();
591
592        // Message count metrics
593        output
594            .push_str("# HELP rusmes_storage_messages_total Total number of messages in storage\n");
595        output.push_str("# TYPE rusmes_storage_messages_total gauge\n");
596        output.push_str(&format!(
597            "rusmes_storage_messages_total {}\n",
598            self.messages_total.load(Ordering::Relaxed)
599        ));
600
601        output.push_str(
602            "# HELP rusmes_storage_messages_deleted_total Total number of deleted messages\n",
603        );
604        output.push_str("# TYPE rusmes_storage_messages_deleted_total counter\n");
605        output.push_str(&format!(
606            "rusmes_storage_messages_deleted_total {}\n",
607            self.messages_deleted.load(Ordering::Relaxed)
608        ));
609
610        output.push_str("# HELP rusmes_storage_messages_flagged Number of flagged messages\n");
611        output.push_str("# TYPE rusmes_storage_messages_flagged gauge\n");
612        output.push_str(&format!(
613            "rusmes_storage_messages_flagged {}\n",
614            self.messages_flagged.load(Ordering::Relaxed)
615        ));
616
617        output.push_str("# HELP rusmes_storage_messages_seen Number of seen messages\n");
618        output.push_str("# TYPE rusmes_storage_messages_seen gauge\n");
619        output.push_str(&format!(
620            "rusmes_storage_messages_seen {}\n",
621            self.messages_seen.load(Ordering::Relaxed)
622        ));
623
624        output.push_str("# HELP rusmes_storage_messages_unseen Number of unseen messages\n");
625        output.push_str("# TYPE rusmes_storage_messages_unseen gauge\n");
626        output.push_str(&format!(
627            "rusmes_storage_messages_unseen {}\n",
628            self.messages_unseen.load(Ordering::Relaxed)
629        ));
630
631        // Mailbox count metrics
632        output.push_str("# HELP rusmes_storage_mailboxes_total Total number of mailboxes\n");
633        output.push_str("# TYPE rusmes_storage_mailboxes_total gauge\n");
634        output.push_str(&format!(
635            "rusmes_storage_mailboxes_total {}\n",
636            self.mailboxes_total.load(Ordering::Relaxed)
637        ));
638
639        output.push_str(
640            "# HELP rusmes_storage_mailboxes_created_total Total number of mailboxes created\n",
641        );
642        output.push_str("# TYPE rusmes_storage_mailboxes_created_total counter\n");
643        output.push_str(&format!(
644            "rusmes_storage_mailboxes_created_total {}\n",
645            self.mailboxes_created.load(Ordering::Relaxed)
646        ));
647
648        output.push_str(
649            "# HELP rusmes_storage_mailboxes_deleted_total Total number of mailboxes deleted\n",
650        );
651        output.push_str("# TYPE rusmes_storage_mailboxes_deleted_total counter\n");
652        output.push_str(&format!(
653            "rusmes_storage_mailboxes_deleted_total {}\n",
654            self.mailboxes_deleted.load(Ordering::Relaxed)
655        ));
656
657        // Disk usage metrics
658        output.push_str("# HELP rusmes_storage_disk_usage_bytes Total disk usage in bytes\n");
659        output.push_str("# TYPE rusmes_storage_disk_usage_bytes gauge\n");
660        output.push_str(&format!(
661            "rusmes_storage_disk_usage_bytes {}\n",
662            self.disk_usage_total_bytes.load(Ordering::Relaxed)
663        ));
664
665        // Per-user disk usage
666        if let Ok(map) = self.disk_usage_per_user.read() {
667            if !map.is_empty() {
668                output.push_str(
669                    "# HELP rusmes_storage_user_disk_usage_bytes Disk usage per user in bytes\n",
670                );
671                output.push_str("# TYPE rusmes_storage_user_disk_usage_bytes gauge\n");
672                for (user, counter) in map.iter() {
673                    output.push_str(&format!(
674                        "rusmes_storage_user_disk_usage_bytes{{user=\"{}\"}} {}\n",
675                        user,
676                        counter.load(Ordering::Relaxed)
677                    ));
678                }
679            }
680        }
681
682        // Per-mailbox message counts
683        if let Ok(map) = self.messages_per_mailbox.read() {
684            if !map.is_empty() {
685                output
686                    .push_str("# HELP rusmes_storage_mailbox_messages Message count per mailbox\n");
687                output.push_str("# TYPE rusmes_storage_mailbox_messages gauge\n");
688                for (mailbox_id, counter) in map.iter() {
689                    output.push_str(&format!(
690                        "rusmes_storage_mailbox_messages{{mailbox=\"{}\"}} {}\n",
691                        mailbox_id,
692                        counter.load(Ordering::Relaxed)
693                    ));
694                }
695            }
696        }
697
698        // Operation counters
699        output.push_str("# HELP rusmes_storage_append_operations_total Total append operations\n");
700        output.push_str("# TYPE rusmes_storage_append_operations_total counter\n");
701        output.push_str(&format!(
702            "rusmes_storage_append_operations_total {}\n",
703            self.append_operations_total.load(Ordering::Relaxed)
704        ));
705
706        output.push_str("# HELP rusmes_storage_fetch_operations_total Total fetch operations\n");
707        output.push_str("# TYPE rusmes_storage_fetch_operations_total counter\n");
708        output.push_str(&format!(
709            "rusmes_storage_fetch_operations_total {}\n",
710            self.fetch_operations_total.load(Ordering::Relaxed)
711        ));
712
713        output.push_str("# HELP rusmes_storage_delete_operations_total Total delete operations\n");
714        output.push_str("# TYPE rusmes_storage_delete_operations_total counter\n");
715        output.push_str(&format!(
716            "rusmes_storage_delete_operations_total {}\n",
717            self.delete_operations_total.load(Ordering::Relaxed)
718        ));
719
720        output.push_str("# HELP rusmes_storage_search_operations_total Total search operations\n");
721        output.push_str("# TYPE rusmes_storage_search_operations_total counter\n");
722        output.push_str(&format!(
723            "rusmes_storage_search_operations_total {}\n",
724            self.search_operations_total.load(Ordering::Relaxed)
725        ));
726
727        output.push_str("# HELP rusmes_storage_copy_operations_total Total copy operations\n");
728        output.push_str("# TYPE rusmes_storage_copy_operations_total counter\n");
729        output.push_str(&format!(
730            "rusmes_storage_copy_operations_total {}\n",
731            self.copy_operations_total.load(Ordering::Relaxed)
732        ));
733
734        // Error counters
735        output.push_str("# HELP rusmes_storage_append_errors_total Total append errors\n");
736        output.push_str("# TYPE rusmes_storage_append_errors_total counter\n");
737        output.push_str(&format!(
738            "rusmes_storage_append_errors_total {}\n",
739            self.append_errors_total.load(Ordering::Relaxed)
740        ));
741
742        output.push_str("# HELP rusmes_storage_fetch_errors_total Total fetch errors\n");
743        output.push_str("# TYPE rusmes_storage_fetch_errors_total counter\n");
744        output.push_str(&format!(
745            "rusmes_storage_fetch_errors_total {}\n",
746            self.fetch_errors_total.load(Ordering::Relaxed)
747        ));
748
749        output.push_str("# HELP rusmes_storage_delete_errors_total Total delete errors\n");
750        output.push_str("# TYPE rusmes_storage_delete_errors_total counter\n");
751        output.push_str(&format!(
752            "rusmes_storage_delete_errors_total {}\n",
753            self.delete_errors_total.load(Ordering::Relaxed)
754        ));
755
756        output.push_str("# HELP rusmes_storage_search_errors_total Total search errors\n");
757        output.push_str("# TYPE rusmes_storage_search_errors_total counter\n");
758        output.push_str(&format!(
759            "rusmes_storage_search_errors_total {}\n",
760            self.search_errors_total.load(Ordering::Relaxed)
761        ));
762
763        // Backend health
764        output.push_str("# HELP rusmes_storage_backend_healthy Backend health status (1=healthy, 0=unhealthy)\n");
765        output.push_str("# TYPE rusmes_storage_backend_healthy gauge\n");
766        output.push_str(&format!(
767            "rusmes_storage_backend_healthy {}\n",
768            self.backend_healthy.load(Ordering::Relaxed)
769        ));
770
771        output.push_str(
772            "# HELP rusmes_storage_backend_last_check_timestamp Last health check timestamp\n",
773        );
774        output.push_str("# TYPE rusmes_storage_backend_last_check_timestamp gauge\n");
775        output.push_str(&format!(
776            "rusmes_storage_backend_last_check_timestamp {}\n",
777            self.backend_last_check.load(Ordering::Relaxed)
778        ));
779
780        // Latency histograms
781        output.push_str(&self.append_latency.export(
782            "rusmes_storage_append_latency_seconds",
783            "Append operation latency in seconds",
784        ));
785
786        output.push_str(&self.fetch_latency.export(
787            "rusmes_storage_fetch_latency_seconds",
788            "Fetch operation latency in seconds",
789        ));
790
791        output.push_str(&self.delete_latency.export(
792            "rusmes_storage_delete_latency_seconds",
793            "Delete operation latency in seconds",
794        ));
795
796        output.push_str(&self.search_latency.export(
797            "rusmes_storage_search_latency_seconds",
798            "Search operation latency in seconds",
799        ));
800
801        output
802    }
803
804    /// Get a summary of current metrics
805    pub fn get_summary(&self) -> MetricsSummary {
806        MetricsSummary {
807            messages_total: self.messages_total.load(Ordering::Relaxed),
808            messages_deleted: self.messages_deleted.load(Ordering::Relaxed),
809            mailboxes_total: self.mailboxes_total.load(Ordering::Relaxed),
810            disk_usage_bytes: self.disk_usage_total_bytes.load(Ordering::Relaxed),
811            append_operations: self.append_operations_total.load(Ordering::Relaxed),
812            fetch_operations: self.fetch_operations_total.load(Ordering::Relaxed),
813            delete_operations: self.delete_operations_total.load(Ordering::Relaxed),
814            search_operations: self.search_operations_total.load(Ordering::Relaxed),
815            append_errors: self.append_errors_total.load(Ordering::Relaxed),
816            fetch_errors: self.fetch_errors_total.load(Ordering::Relaxed),
817            delete_errors: self.delete_errors_total.load(Ordering::Relaxed),
818            search_errors: self.search_errors_total.load(Ordering::Relaxed),
819            backend_healthy: self.is_backend_healthy(),
820            append_avg_latency_ms: self.append_latency.average() * 1000.0,
821            fetch_avg_latency_ms: self.fetch_latency.average() * 1000.0,
822            delete_avg_latency_ms: self.delete_latency.average() * 1000.0,
823            search_avg_latency_ms: self.search_latency.average() * 1000.0,
824        }
825    }
826}
827
828/// Summary of storage metrics
829#[derive(Debug, Clone)]
830pub struct MetricsSummary {
831    pub messages_total: u64,
832    pub messages_deleted: u64,
833    pub mailboxes_total: u64,
834    pub disk_usage_bytes: u64,
835    pub append_operations: u64,
836    pub fetch_operations: u64,
837    pub delete_operations: u64,
838    pub search_operations: u64,
839    pub append_errors: u64,
840    pub fetch_errors: u64,
841    pub delete_errors: u64,
842    pub search_errors: u64,
843    pub backend_healthy: bool,
844    pub append_avg_latency_ms: f64,
845    pub fetch_avg_latency_ms: f64,
846    pub delete_avg_latency_ms: f64,
847    pub search_avg_latency_ms: f64,
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853
854    #[test]
855    fn test_histogram_observe() {
856        let hist = Histogram::new(vec![0.001, 0.01, 0.1, 1.0]);
857
858        hist.observe(0.005);
859        hist.observe(0.05);
860        hist.observe(0.5);
861
862        assert_eq!(hist.get_count(), 3);
863        assert!(hist.average() > 0.0);
864    }
865
866    #[test]
867    fn test_storage_metrics_messages() {
868        let metrics = StorageMetrics::new();
869
870        metrics.inc_messages_total(5);
871        assert_eq!(metrics.get_messages_total(), 5);
872
873        metrics.dec_messages_total(2);
874        assert_eq!(metrics.get_messages_total(), 3);
875
876        metrics.set_messages_total(10);
877        assert_eq!(metrics.get_messages_total(), 10);
878    }
879
880    #[test]
881    fn test_storage_metrics_disk_usage() {
882        let metrics = StorageMetrics::new();
883
884        metrics.add_disk_usage_bytes(1024);
885        assert_eq!(metrics.get_disk_usage_bytes(), 1024);
886
887        metrics.sub_disk_usage_bytes(512);
888        assert_eq!(metrics.get_disk_usage_bytes(), 512);
889
890        metrics.set_disk_usage_bytes(2048);
891        assert_eq!(metrics.get_disk_usage_bytes(), 2048);
892    }
893
894    #[test]
895    fn test_storage_metrics_per_user() {
896        let metrics = StorageMetrics::new();
897
898        metrics.add_user_disk_usage("user1", 1024);
899        metrics.add_user_disk_usage("user2", 2048);
900
901        assert_eq!(metrics.get_user_disk_usage("user1"), 1024);
902        assert_eq!(metrics.get_user_disk_usage("user2"), 2048);
903        assert_eq!(metrics.get_disk_usage_bytes(), 3072);
904
905        metrics.sub_user_disk_usage("user1", 512);
906        assert_eq!(metrics.get_user_disk_usage("user1"), 512);
907        assert_eq!(metrics.get_disk_usage_bytes(), 2560);
908    }
909
910    #[test]
911    fn test_storage_metrics_per_mailbox() {
912        let metrics = StorageMetrics::new();
913
914        metrics.set_mailbox_message_count("mailbox1", 10);
915        metrics.inc_mailbox_message_count("mailbox1", 5);
916
917        assert_eq!(metrics.get_mailbox_message_count("mailbox1"), 15);
918
919        metrics.dec_mailbox_message_count("mailbox1", 3);
920        assert_eq!(metrics.get_mailbox_message_count("mailbox1"), 12);
921    }
922
923    #[test]
924    fn test_storage_metrics_operations() {
925        let metrics = StorageMetrics::new();
926
927        metrics.inc_append_operations();
928        metrics.inc_fetch_operations();
929        metrics.inc_delete_operations();
930        metrics.inc_search_operations();
931
932        let summary = metrics.get_summary();
933        assert_eq!(summary.append_operations, 1);
934        assert_eq!(summary.fetch_operations, 1);
935        assert_eq!(summary.delete_operations, 1);
936        assert_eq!(summary.search_operations, 1);
937    }
938
939    #[test]
940    fn test_storage_metrics_errors() {
941        let metrics = StorageMetrics::new();
942
943        metrics.inc_append_errors();
944        metrics.inc_fetch_errors();
945        metrics.inc_delete_errors();
946        metrics.inc_search_errors();
947
948        let summary = metrics.get_summary();
949        assert_eq!(summary.append_errors, 1);
950        assert_eq!(summary.fetch_errors, 1);
951        assert_eq!(summary.delete_errors, 1);
952        assert_eq!(summary.search_errors, 1);
953    }
954
955    #[test]
956    fn test_storage_metrics_backend_health() {
957        let metrics = StorageMetrics::new();
958
959        assert!(metrics.is_backend_healthy());
960
961        metrics.set_backend_healthy(false);
962        assert!(!metrics.is_backend_healthy());
963
964        metrics.set_backend_healthy(true);
965        assert!(metrics.is_backend_healthy());
966    }
967
968    #[test]
969    fn test_storage_metrics_helper_methods() {
970        let metrics = StorageMetrics::new();
971
972        metrics.record_append_success(1024);
973        assert_eq!(metrics.get_messages_total(), 1);
974        assert_eq!(metrics.get_disk_usage_bytes(), 1024);
975
976        metrics.record_delete_success(512, 1);
977        assert_eq!(metrics.get_messages_total(), 0);
978        assert_eq!(metrics.get_disk_usage_bytes(), 512);
979
980        metrics.record_append_failure();
981        let summary = metrics.get_summary();
982        assert_eq!(summary.append_errors, 1);
983    }
984
985    #[test]
986    fn test_prometheus_export() {
987        let metrics = StorageMetrics::new();
988
989        metrics.inc_messages_total(100);
990        metrics.add_disk_usage_bytes(1048576);
991        metrics.inc_mailboxes_total();
992        metrics.inc_append_operations();
993
994        let output = metrics.export_prometheus();
995
996        assert!(output.contains("rusmes_storage_messages_total 100"));
997        assert!(output.contains("rusmes_storage_disk_usage_bytes 1048576"));
998        assert!(output.contains("rusmes_storage_mailboxes_total 1"));
999        assert!(output.contains("rusmes_storage_append_operations_total 1"));
1000        assert!(output.contains("# HELP"));
1001        assert!(output.contains("# TYPE"));
1002    }
1003
1004    #[test]
1005    fn test_timer() {
1006        let metrics = StorageMetrics::new();
1007
1008        let timer = metrics.start_append_timer();
1009        std::thread::sleep(std::time::Duration::from_millis(10));
1010        timer.observe();
1011
1012        assert!(metrics.append_latency.get_count() > 0);
1013        assert!(metrics.append_latency.average() >= 0.01);
1014    }
1015}