1use std::collections::HashMap;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{Arc, RwLock};
36use std::time::Instant;
37
38#[derive(Debug, Clone)]
40pub struct Histogram {
41 buckets: Vec<f64>,
42 counts: Vec<Arc<AtomicU64>>,
43 sum: Arc<AtomicU64>,
44 count: Arc<AtomicU64>,
45}
46
47impl Histogram {
48 pub fn new(buckets: Vec<f64>) -> Self {
50 let counts = buckets
51 .iter()
52 .map(|_| Arc::new(AtomicU64::new(0)))
53 .collect();
54 Self {
55 buckets,
56 counts,
57 sum: Arc::new(AtomicU64::new(0)),
58 count: Arc::new(AtomicU64::new(0)),
59 }
60 }
61
62 pub fn observe(&self, value: f64) {
64 let millis = (value * 1000.0) as u64;
65 self.sum.fetch_add(millis, Ordering::Relaxed);
66 self.count.fetch_add(1, Ordering::Relaxed);
67
68 for (i, &bucket) in self.buckets.iter().enumerate() {
69 if value <= bucket {
70 self.counts[i].fetch_add(1, Ordering::Relaxed);
71 }
72 }
73 }
74
75 pub fn export(&self, name: &str, help: &str) -> String {
77 let mut output = String::new();
78 output.push_str(&format!("# HELP {} {}\n", name, help));
79 output.push_str(&format!("# TYPE {} histogram\n", name));
80
81 for (i, &bucket) in self.buckets.iter().enumerate() {
82 let count = self.counts[i].load(Ordering::Relaxed);
83 output.push_str(&format!("{}_bucket{{le=\"{}\"}} {}\n", name, bucket, count));
84 }
85
86 output.push_str(&format!(
87 "{}_bucket{{le=\"+Inf\"}} {}\n",
88 name,
89 self.count.load(Ordering::Relaxed)
90 ));
91 output.push_str(&format!(
92 "{}_sum {}\n",
93 name,
94 self.sum.load(Ordering::Relaxed) as f64 / 1000.0
95 ));
96 output.push_str(&format!(
97 "{}_count {}\n",
98 name,
99 self.count.load(Ordering::Relaxed)
100 ));
101
102 output
103 }
104
105 pub fn get_count(&self) -> u64 {
107 self.count.load(Ordering::Relaxed)
108 }
109
110 pub fn get_sum(&self) -> u64 {
112 self.sum.load(Ordering::Relaxed)
113 }
114
115 pub fn average(&self) -> f64 {
117 let count = self.get_count();
118 if count == 0 {
119 0.0
120 } else {
121 (self.get_sum() as f64 / 1000.0) / count as f64
122 }
123 }
124}
125
126pub struct StorageTimer {
128 start: Instant,
129 histogram: Arc<Histogram>,
130}
131
132impl StorageTimer {
133 fn new(histogram: Arc<Histogram>) -> Self {
134 Self {
135 start: Instant::now(),
136 histogram,
137 }
138 }
139
140 pub fn observe(self) {
142 let duration = self.start.elapsed().as_secs_f64();
143 self.histogram.observe(duration);
144 }
145
146 pub fn elapsed(&self) -> f64 {
148 self.start.elapsed().as_secs_f64()
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct StorageMetrics {
155 messages_total: Arc<AtomicU64>,
157 messages_deleted: Arc<AtomicU64>,
158 messages_flagged: Arc<AtomicU64>,
159 messages_seen: Arc<AtomicU64>,
160 messages_unseen: Arc<AtomicU64>,
161
162 mailboxes_total: Arc<AtomicU64>,
164 mailboxes_created: Arc<AtomicU64>,
165 mailboxes_deleted: Arc<AtomicU64>,
166
167 disk_usage_total_bytes: Arc<AtomicU64>,
169
170 disk_usage_per_user: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
172
173 messages_per_mailbox: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
175
176 append_operations_total: Arc<AtomicU64>,
178 fetch_operations_total: Arc<AtomicU64>,
179 delete_operations_total: Arc<AtomicU64>,
180 search_operations_total: Arc<AtomicU64>,
181 copy_operations_total: Arc<AtomicU64>,
182
183 append_errors_total: Arc<AtomicU64>,
185 fetch_errors_total: Arc<AtomicU64>,
186 delete_errors_total: Arc<AtomicU64>,
187 search_errors_total: Arc<AtomicU64>,
188
189 backend_healthy: Arc<AtomicU64>,
191 backend_last_check: Arc<AtomicU64>,
192
193 append_latency: Arc<Histogram>,
195 fetch_latency: Arc<Histogram>,
196 delete_latency: Arc<Histogram>,
197 search_latency: Arc<Histogram>,
198}
199
200impl Default for StorageMetrics {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl StorageMetrics {
207 pub fn new() -> Self {
209 let latency_buckets = vec![
212 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
213 ];
214
215 Self {
216 messages_total: Arc::new(AtomicU64::new(0)),
217 messages_deleted: Arc::new(AtomicU64::new(0)),
218 messages_flagged: Arc::new(AtomicU64::new(0)),
219 messages_seen: Arc::new(AtomicU64::new(0)),
220 messages_unseen: Arc::new(AtomicU64::new(0)),
221 mailboxes_total: Arc::new(AtomicU64::new(0)),
222 mailboxes_created: Arc::new(AtomicU64::new(0)),
223 mailboxes_deleted: Arc::new(AtomicU64::new(0)),
224 disk_usage_total_bytes: Arc::new(AtomicU64::new(0)),
225 disk_usage_per_user: Arc::new(RwLock::new(HashMap::new())),
226 messages_per_mailbox: Arc::new(RwLock::new(HashMap::new())),
227 append_operations_total: Arc::new(AtomicU64::new(0)),
228 fetch_operations_total: Arc::new(AtomicU64::new(0)),
229 delete_operations_total: Arc::new(AtomicU64::new(0)),
230 search_operations_total: Arc::new(AtomicU64::new(0)),
231 copy_operations_total: Arc::new(AtomicU64::new(0)),
232 append_errors_total: Arc::new(AtomicU64::new(0)),
233 fetch_errors_total: Arc::new(AtomicU64::new(0)),
234 delete_errors_total: Arc::new(AtomicU64::new(0)),
235 search_errors_total: Arc::new(AtomicU64::new(0)),
236 backend_healthy: Arc::new(AtomicU64::new(1)),
237 backend_last_check: Arc::new(AtomicU64::new(0)),
238 append_latency: Arc::new(Histogram::new(latency_buckets.clone())),
239 fetch_latency: Arc::new(Histogram::new(latency_buckets.clone())),
240 delete_latency: Arc::new(Histogram::new(latency_buckets.clone())),
241 search_latency: Arc::new(Histogram::new(latency_buckets)),
242 }
243 }
244
245 pub fn inc_messages_total(&self, count: u64) {
249 self.messages_total.fetch_add(count, Ordering::Relaxed);
250 }
251
252 pub fn dec_messages_total(&self, count: u64) {
254 self.messages_total.fetch_sub(count, Ordering::Relaxed);
255 }
256
257 pub fn set_messages_total(&self, count: u64) {
259 self.messages_total.store(count, Ordering::Relaxed);
260 }
261
262 pub fn get_messages_total(&self) -> u64 {
264 self.messages_total.load(Ordering::Relaxed)
265 }
266
267 pub fn inc_messages_deleted(&self, count: u64) {
269 self.messages_deleted.fetch_add(count, Ordering::Relaxed);
270 }
271
272 pub fn inc_messages_flagged(&self) {
274 self.messages_flagged.fetch_add(1, Ordering::Relaxed);
275 }
276
277 pub fn dec_messages_flagged(&self) {
279 self.messages_flagged.fetch_sub(1, Ordering::Relaxed);
280 }
281
282 pub fn inc_messages_seen(&self) {
284 self.messages_seen.fetch_add(1, Ordering::Relaxed);
285 }
286
287 pub fn dec_messages_seen(&self) {
289 self.messages_seen.fetch_sub(1, Ordering::Relaxed);
290 }
291
292 pub fn inc_messages_unseen(&self) {
294 self.messages_unseen.fetch_add(1, Ordering::Relaxed);
295 }
296
297 pub fn dec_messages_unseen(&self) {
299 self.messages_unseen.fetch_sub(1, Ordering::Relaxed);
300 }
301
302 pub fn inc_mailboxes_total(&self) {
306 self.mailboxes_total.fetch_add(1, Ordering::Relaxed);
307 }
308
309 pub fn dec_mailboxes_total(&self) {
311 self.mailboxes_total.fetch_sub(1, Ordering::Relaxed);
312 }
313
314 pub fn set_mailboxes_total(&self, count: u64) {
316 self.mailboxes_total.store(count, Ordering::Relaxed);
317 }
318
319 pub fn inc_mailboxes_created(&self) {
321 self.mailboxes_created.fetch_add(1, Ordering::Relaxed);
322 self.inc_mailboxes_total();
323 }
324
325 pub fn inc_mailboxes_deleted(&self) {
327 self.mailboxes_deleted.fetch_add(1, Ordering::Relaxed);
328 self.dec_mailboxes_total();
329 }
330
331 pub fn add_disk_usage_bytes(&self, bytes: u64) {
335 self.disk_usage_total_bytes
336 .fetch_add(bytes, Ordering::Relaxed);
337 }
338
339 pub fn sub_disk_usage_bytes(&self, bytes: u64) {
341 self.disk_usage_total_bytes
342 .fetch_sub(bytes, Ordering::Relaxed);
343 }
344
345 pub fn set_disk_usage_bytes(&self, bytes: u64) {
347 self.disk_usage_total_bytes.store(bytes, Ordering::Relaxed);
348 }
349
350 pub fn get_disk_usage_bytes(&self) -> u64 {
352 self.disk_usage_total_bytes.load(Ordering::Relaxed)
353 }
354
355 pub fn set_user_disk_usage(&self, user: &str, bytes: u64) {
357 if let Ok(mut map) = self.disk_usage_per_user.write() {
358 map.entry(user.to_string())
359 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
360 .store(bytes, Ordering::Relaxed);
361 }
362 }
363
364 pub fn add_user_disk_usage(&self, user: &str, bytes: u64) {
366 let found = if let Ok(map) = self.disk_usage_per_user.read() {
367 if let Some(counter) = map.get(user) {
368 counter.fetch_add(bytes, Ordering::Relaxed);
369 true
370 } else {
371 false
372 }
373 } else {
374 false
375 };
376 if !found {
377 self.set_user_disk_usage(user, bytes);
378 }
379 self.add_disk_usage_bytes(bytes);
380 }
381
382 pub fn sub_user_disk_usage(&self, user: &str, bytes: u64) {
384 if let Ok(map) = self.disk_usage_per_user.read() {
385 if let Some(counter) = map.get(user) {
386 counter.fetch_sub(bytes, Ordering::Relaxed);
387 }
388 }
389 self.sub_disk_usage_bytes(bytes);
390 }
391
392 pub fn get_user_disk_usage(&self, user: &str) -> u64 {
394 self.disk_usage_per_user
395 .read()
396 .ok()
397 .and_then(|map| map.get(user).map(|c| c.load(Ordering::Relaxed)))
398 .unwrap_or(0)
399 }
400
401 pub fn set_mailbox_message_count(&self, mailbox_id: &str, count: u64) {
405 if let Ok(mut map) = self.messages_per_mailbox.write() {
406 map.entry(mailbox_id.to_string())
407 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
408 .store(count, Ordering::Relaxed);
409 }
410 }
411
412 pub fn inc_mailbox_message_count(&self, mailbox_id: &str, count: u64) {
414 let found = if let Ok(map) = self.messages_per_mailbox.read() {
415 if let Some(counter) = map.get(mailbox_id) {
416 counter.fetch_add(count, Ordering::Relaxed);
417 true
418 } else {
419 false
420 }
421 } else {
422 false
423 };
424 if !found {
425 self.set_mailbox_message_count(mailbox_id, count);
426 }
427 }
428
429 pub fn dec_mailbox_message_count(&self, mailbox_id: &str, count: u64) {
431 if let Ok(map) = self.messages_per_mailbox.read() {
432 if let Some(counter) = map.get(mailbox_id) {
433 counter.fetch_sub(count, Ordering::Relaxed);
434 }
435 }
436 }
437
438 pub fn get_mailbox_message_count(&self, mailbox_id: &str) -> u64 {
440 self.messages_per_mailbox
441 .read()
442 .ok()
443 .and_then(|map| map.get(mailbox_id).map(|c| c.load(Ordering::Relaxed)))
444 .unwrap_or(0)
445 }
446
447 pub fn inc_append_operations(&self) {
451 self.append_operations_total.fetch_add(1, Ordering::Relaxed);
452 }
453
454 pub fn inc_fetch_operations(&self) {
456 self.fetch_operations_total.fetch_add(1, Ordering::Relaxed);
457 }
458
459 pub fn inc_delete_operations(&self) {
461 self.delete_operations_total.fetch_add(1, Ordering::Relaxed);
462 }
463
464 pub fn inc_search_operations(&self) {
466 self.search_operations_total.fetch_add(1, Ordering::Relaxed);
467 }
468
469 pub fn inc_copy_operations(&self) {
471 self.copy_operations_total.fetch_add(1, Ordering::Relaxed);
472 }
473
474 pub fn inc_append_errors(&self) {
478 self.append_errors_total.fetch_add(1, Ordering::Relaxed);
479 }
480
481 pub fn inc_fetch_errors(&self) {
483 self.fetch_errors_total.fetch_add(1, Ordering::Relaxed);
484 }
485
486 pub fn inc_delete_errors(&self) {
488 self.delete_errors_total.fetch_add(1, Ordering::Relaxed);
489 }
490
491 pub fn inc_search_errors(&self) {
493 self.search_errors_total.fetch_add(1, Ordering::Relaxed);
494 }
495
496 pub fn set_backend_healthy(&self, healthy: bool) {
500 self.backend_healthy
501 .store(if healthy { 1 } else { 0 }, Ordering::Relaxed);
502 }
503
504 pub fn is_backend_healthy(&self) -> bool {
506 self.backend_healthy.load(Ordering::Relaxed) == 1
507 }
508
509 pub fn update_health_check_time(&self, timestamp: u64) {
511 self.backend_last_check.store(timestamp, Ordering::Relaxed);
512 }
513
514 pub fn get_last_health_check(&self) -> u64 {
516 self.backend_last_check.load(Ordering::Relaxed)
517 }
518
519 pub fn start_append_timer(&self) -> StorageTimer {
523 StorageTimer::new(Arc::clone(&self.append_latency))
524 }
525
526 pub fn start_fetch_timer(&self) -> StorageTimer {
528 StorageTimer::new(Arc::clone(&self.fetch_latency))
529 }
530
531 pub fn start_delete_timer(&self) -> StorageTimer {
533 StorageTimer::new(Arc::clone(&self.delete_latency))
534 }
535
536 pub fn start_search_timer(&self) -> StorageTimer {
538 StorageTimer::new(Arc::clone(&self.search_latency))
539 }
540
541 pub fn record_append_success(&self, size_bytes: u64) {
545 self.inc_append_operations();
546 self.inc_messages_total(1);
547 self.add_disk_usage_bytes(size_bytes);
548 }
549
550 pub fn record_append_failure(&self) {
552 self.inc_append_errors();
553 }
554
555 pub fn record_delete_success(&self, size_bytes: u64, count: u64) {
557 self.inc_delete_operations();
558 self.dec_messages_total(count);
559 self.inc_messages_deleted(count);
560 self.sub_disk_usage_bytes(size_bytes);
561 }
562
563 pub fn record_delete_failure(&self) {
565 self.inc_delete_errors();
566 }
567
568 pub fn record_fetch_success(&self) {
570 self.inc_fetch_operations();
571 }
572
573 pub fn record_fetch_failure(&self) {
575 self.inc_fetch_errors();
576 }
577
578 pub fn record_search_success(&self) {
580 self.inc_search_operations();
581 }
582
583 pub fn record_search_failure(&self) {
585 self.inc_search_errors();
586 }
587
588 pub fn export_prometheus(&self) -> String {
590 let mut output = String::new();
591
592 output
594 .push_str("# HELP rusmes_storage_messages_total Total number of messages in storage\n");
595 output.push_str("# TYPE rusmes_storage_messages_total gauge\n");
596 output.push_str(&format!(
597 "rusmes_storage_messages_total {}\n",
598 self.messages_total.load(Ordering::Relaxed)
599 ));
600
601 output.push_str(
602 "# HELP rusmes_storage_messages_deleted_total Total number of deleted messages\n",
603 );
604 output.push_str("# TYPE rusmes_storage_messages_deleted_total counter\n");
605 output.push_str(&format!(
606 "rusmes_storage_messages_deleted_total {}\n",
607 self.messages_deleted.load(Ordering::Relaxed)
608 ));
609
610 output.push_str("# HELP rusmes_storage_messages_flagged Number of flagged messages\n");
611 output.push_str("# TYPE rusmes_storage_messages_flagged gauge\n");
612 output.push_str(&format!(
613 "rusmes_storage_messages_flagged {}\n",
614 self.messages_flagged.load(Ordering::Relaxed)
615 ));
616
617 output.push_str("# HELP rusmes_storage_messages_seen Number of seen messages\n");
618 output.push_str("# TYPE rusmes_storage_messages_seen gauge\n");
619 output.push_str(&format!(
620 "rusmes_storage_messages_seen {}\n",
621 self.messages_seen.load(Ordering::Relaxed)
622 ));
623
624 output.push_str("# HELP rusmes_storage_messages_unseen Number of unseen messages\n");
625 output.push_str("# TYPE rusmes_storage_messages_unseen gauge\n");
626 output.push_str(&format!(
627 "rusmes_storage_messages_unseen {}\n",
628 self.messages_unseen.load(Ordering::Relaxed)
629 ));
630
631 output.push_str("# HELP rusmes_storage_mailboxes_total Total number of mailboxes\n");
633 output.push_str("# TYPE rusmes_storage_mailboxes_total gauge\n");
634 output.push_str(&format!(
635 "rusmes_storage_mailboxes_total {}\n",
636 self.mailboxes_total.load(Ordering::Relaxed)
637 ));
638
639 output.push_str(
640 "# HELP rusmes_storage_mailboxes_created_total Total number of mailboxes created\n",
641 );
642 output.push_str("# TYPE rusmes_storage_mailboxes_created_total counter\n");
643 output.push_str(&format!(
644 "rusmes_storage_mailboxes_created_total {}\n",
645 self.mailboxes_created.load(Ordering::Relaxed)
646 ));
647
648 output.push_str(
649 "# HELP rusmes_storage_mailboxes_deleted_total Total number of mailboxes deleted\n",
650 );
651 output.push_str("# TYPE rusmes_storage_mailboxes_deleted_total counter\n");
652 output.push_str(&format!(
653 "rusmes_storage_mailboxes_deleted_total {}\n",
654 self.mailboxes_deleted.load(Ordering::Relaxed)
655 ));
656
657 output.push_str("# HELP rusmes_storage_disk_usage_bytes Total disk usage in bytes\n");
659 output.push_str("# TYPE rusmes_storage_disk_usage_bytes gauge\n");
660 output.push_str(&format!(
661 "rusmes_storage_disk_usage_bytes {}\n",
662 self.disk_usage_total_bytes.load(Ordering::Relaxed)
663 ));
664
665 if let Ok(map) = self.disk_usage_per_user.read() {
667 if !map.is_empty() {
668 output.push_str(
669 "# HELP rusmes_storage_user_disk_usage_bytes Disk usage per user in bytes\n",
670 );
671 output.push_str("# TYPE rusmes_storage_user_disk_usage_bytes gauge\n");
672 for (user, counter) in map.iter() {
673 output.push_str(&format!(
674 "rusmes_storage_user_disk_usage_bytes{{user=\"{}\"}} {}\n",
675 user,
676 counter.load(Ordering::Relaxed)
677 ));
678 }
679 }
680 }
681
682 if let Ok(map) = self.messages_per_mailbox.read() {
684 if !map.is_empty() {
685 output
686 .push_str("# HELP rusmes_storage_mailbox_messages Message count per mailbox\n");
687 output.push_str("# TYPE rusmes_storage_mailbox_messages gauge\n");
688 for (mailbox_id, counter) in map.iter() {
689 output.push_str(&format!(
690 "rusmes_storage_mailbox_messages{{mailbox=\"{}\"}} {}\n",
691 mailbox_id,
692 counter.load(Ordering::Relaxed)
693 ));
694 }
695 }
696 }
697
698 output.push_str("# HELP rusmes_storage_append_operations_total Total append operations\n");
700 output.push_str("# TYPE rusmes_storage_append_operations_total counter\n");
701 output.push_str(&format!(
702 "rusmes_storage_append_operations_total {}\n",
703 self.append_operations_total.load(Ordering::Relaxed)
704 ));
705
706 output.push_str("# HELP rusmes_storage_fetch_operations_total Total fetch operations\n");
707 output.push_str("# TYPE rusmes_storage_fetch_operations_total counter\n");
708 output.push_str(&format!(
709 "rusmes_storage_fetch_operations_total {}\n",
710 self.fetch_operations_total.load(Ordering::Relaxed)
711 ));
712
713 output.push_str("# HELP rusmes_storage_delete_operations_total Total delete operations\n");
714 output.push_str("# TYPE rusmes_storage_delete_operations_total counter\n");
715 output.push_str(&format!(
716 "rusmes_storage_delete_operations_total {}\n",
717 self.delete_operations_total.load(Ordering::Relaxed)
718 ));
719
720 output.push_str("# HELP rusmes_storage_search_operations_total Total search operations\n");
721 output.push_str("# TYPE rusmes_storage_search_operations_total counter\n");
722 output.push_str(&format!(
723 "rusmes_storage_search_operations_total {}\n",
724 self.search_operations_total.load(Ordering::Relaxed)
725 ));
726
727 output.push_str("# HELP rusmes_storage_copy_operations_total Total copy operations\n");
728 output.push_str("# TYPE rusmes_storage_copy_operations_total counter\n");
729 output.push_str(&format!(
730 "rusmes_storage_copy_operations_total {}\n",
731 self.copy_operations_total.load(Ordering::Relaxed)
732 ));
733
734 output.push_str("# HELP rusmes_storage_append_errors_total Total append errors\n");
736 output.push_str("# TYPE rusmes_storage_append_errors_total counter\n");
737 output.push_str(&format!(
738 "rusmes_storage_append_errors_total {}\n",
739 self.append_errors_total.load(Ordering::Relaxed)
740 ));
741
742 output.push_str("# HELP rusmes_storage_fetch_errors_total Total fetch errors\n");
743 output.push_str("# TYPE rusmes_storage_fetch_errors_total counter\n");
744 output.push_str(&format!(
745 "rusmes_storage_fetch_errors_total {}\n",
746 self.fetch_errors_total.load(Ordering::Relaxed)
747 ));
748
749 output.push_str("# HELP rusmes_storage_delete_errors_total Total delete errors\n");
750 output.push_str("# TYPE rusmes_storage_delete_errors_total counter\n");
751 output.push_str(&format!(
752 "rusmes_storage_delete_errors_total {}\n",
753 self.delete_errors_total.load(Ordering::Relaxed)
754 ));
755
756 output.push_str("# HELP rusmes_storage_search_errors_total Total search errors\n");
757 output.push_str("# TYPE rusmes_storage_search_errors_total counter\n");
758 output.push_str(&format!(
759 "rusmes_storage_search_errors_total {}\n",
760 self.search_errors_total.load(Ordering::Relaxed)
761 ));
762
763 output.push_str("# HELP rusmes_storage_backend_healthy Backend health status (1=healthy, 0=unhealthy)\n");
765 output.push_str("# TYPE rusmes_storage_backend_healthy gauge\n");
766 output.push_str(&format!(
767 "rusmes_storage_backend_healthy {}\n",
768 self.backend_healthy.load(Ordering::Relaxed)
769 ));
770
771 output.push_str(
772 "# HELP rusmes_storage_backend_last_check_timestamp Last health check timestamp\n",
773 );
774 output.push_str("# TYPE rusmes_storage_backend_last_check_timestamp gauge\n");
775 output.push_str(&format!(
776 "rusmes_storage_backend_last_check_timestamp {}\n",
777 self.backend_last_check.load(Ordering::Relaxed)
778 ));
779
780 output.push_str(&self.append_latency.export(
782 "rusmes_storage_append_latency_seconds",
783 "Append operation latency in seconds",
784 ));
785
786 output.push_str(&self.fetch_latency.export(
787 "rusmes_storage_fetch_latency_seconds",
788 "Fetch operation latency in seconds",
789 ));
790
791 output.push_str(&self.delete_latency.export(
792 "rusmes_storage_delete_latency_seconds",
793 "Delete operation latency in seconds",
794 ));
795
796 output.push_str(&self.search_latency.export(
797 "rusmes_storage_search_latency_seconds",
798 "Search operation latency in seconds",
799 ));
800
801 output
802 }
803
804 pub fn get_summary(&self) -> MetricsSummary {
806 MetricsSummary {
807 messages_total: self.messages_total.load(Ordering::Relaxed),
808 messages_deleted: self.messages_deleted.load(Ordering::Relaxed),
809 mailboxes_total: self.mailboxes_total.load(Ordering::Relaxed),
810 disk_usage_bytes: self.disk_usage_total_bytes.load(Ordering::Relaxed),
811 append_operations: self.append_operations_total.load(Ordering::Relaxed),
812 fetch_operations: self.fetch_operations_total.load(Ordering::Relaxed),
813 delete_operations: self.delete_operations_total.load(Ordering::Relaxed),
814 search_operations: self.search_operations_total.load(Ordering::Relaxed),
815 append_errors: self.append_errors_total.load(Ordering::Relaxed),
816 fetch_errors: self.fetch_errors_total.load(Ordering::Relaxed),
817 delete_errors: self.delete_errors_total.load(Ordering::Relaxed),
818 search_errors: self.search_errors_total.load(Ordering::Relaxed),
819 backend_healthy: self.is_backend_healthy(),
820 append_avg_latency_ms: self.append_latency.average() * 1000.0,
821 fetch_avg_latency_ms: self.fetch_latency.average() * 1000.0,
822 delete_avg_latency_ms: self.delete_latency.average() * 1000.0,
823 search_avg_latency_ms: self.search_latency.average() * 1000.0,
824 }
825 }
826}
827
828#[derive(Debug, Clone)]
830pub struct MetricsSummary {
831 pub messages_total: u64,
832 pub messages_deleted: u64,
833 pub mailboxes_total: u64,
834 pub disk_usage_bytes: u64,
835 pub append_operations: u64,
836 pub fetch_operations: u64,
837 pub delete_operations: u64,
838 pub search_operations: u64,
839 pub append_errors: u64,
840 pub fetch_errors: u64,
841 pub delete_errors: u64,
842 pub search_errors: u64,
843 pub backend_healthy: bool,
844 pub append_avg_latency_ms: f64,
845 pub fetch_avg_latency_ms: f64,
846 pub delete_avg_latency_ms: f64,
847 pub search_avg_latency_ms: f64,
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853
854 #[test]
855 fn test_histogram_observe() {
856 let hist = Histogram::new(vec![0.001, 0.01, 0.1, 1.0]);
857
858 hist.observe(0.005);
859 hist.observe(0.05);
860 hist.observe(0.5);
861
862 assert_eq!(hist.get_count(), 3);
863 assert!(hist.average() > 0.0);
864 }
865
866 #[test]
867 fn test_storage_metrics_messages() {
868 let metrics = StorageMetrics::new();
869
870 metrics.inc_messages_total(5);
871 assert_eq!(metrics.get_messages_total(), 5);
872
873 metrics.dec_messages_total(2);
874 assert_eq!(metrics.get_messages_total(), 3);
875
876 metrics.set_messages_total(10);
877 assert_eq!(metrics.get_messages_total(), 10);
878 }
879
880 #[test]
881 fn test_storage_metrics_disk_usage() {
882 let metrics = StorageMetrics::new();
883
884 metrics.add_disk_usage_bytes(1024);
885 assert_eq!(metrics.get_disk_usage_bytes(), 1024);
886
887 metrics.sub_disk_usage_bytes(512);
888 assert_eq!(metrics.get_disk_usage_bytes(), 512);
889
890 metrics.set_disk_usage_bytes(2048);
891 assert_eq!(metrics.get_disk_usage_bytes(), 2048);
892 }
893
894 #[test]
895 fn test_storage_metrics_per_user() {
896 let metrics = StorageMetrics::new();
897
898 metrics.add_user_disk_usage("user1", 1024);
899 metrics.add_user_disk_usage("user2", 2048);
900
901 assert_eq!(metrics.get_user_disk_usage("user1"), 1024);
902 assert_eq!(metrics.get_user_disk_usage("user2"), 2048);
903 assert_eq!(metrics.get_disk_usage_bytes(), 3072);
904
905 metrics.sub_user_disk_usage("user1", 512);
906 assert_eq!(metrics.get_user_disk_usage("user1"), 512);
907 assert_eq!(metrics.get_disk_usage_bytes(), 2560);
908 }
909
910 #[test]
911 fn test_storage_metrics_per_mailbox() {
912 let metrics = StorageMetrics::new();
913
914 metrics.set_mailbox_message_count("mailbox1", 10);
915 metrics.inc_mailbox_message_count("mailbox1", 5);
916
917 assert_eq!(metrics.get_mailbox_message_count("mailbox1"), 15);
918
919 metrics.dec_mailbox_message_count("mailbox1", 3);
920 assert_eq!(metrics.get_mailbox_message_count("mailbox1"), 12);
921 }
922
923 #[test]
924 fn test_storage_metrics_operations() {
925 let metrics = StorageMetrics::new();
926
927 metrics.inc_append_operations();
928 metrics.inc_fetch_operations();
929 metrics.inc_delete_operations();
930 metrics.inc_search_operations();
931
932 let summary = metrics.get_summary();
933 assert_eq!(summary.append_operations, 1);
934 assert_eq!(summary.fetch_operations, 1);
935 assert_eq!(summary.delete_operations, 1);
936 assert_eq!(summary.search_operations, 1);
937 }
938
939 #[test]
940 fn test_storage_metrics_errors() {
941 let metrics = StorageMetrics::new();
942
943 metrics.inc_append_errors();
944 metrics.inc_fetch_errors();
945 metrics.inc_delete_errors();
946 metrics.inc_search_errors();
947
948 let summary = metrics.get_summary();
949 assert_eq!(summary.append_errors, 1);
950 assert_eq!(summary.fetch_errors, 1);
951 assert_eq!(summary.delete_errors, 1);
952 assert_eq!(summary.search_errors, 1);
953 }
954
955 #[test]
956 fn test_storage_metrics_backend_health() {
957 let metrics = StorageMetrics::new();
958
959 assert!(metrics.is_backend_healthy());
960
961 metrics.set_backend_healthy(false);
962 assert!(!metrics.is_backend_healthy());
963
964 metrics.set_backend_healthy(true);
965 assert!(metrics.is_backend_healthy());
966 }
967
968 #[test]
969 fn test_storage_metrics_helper_methods() {
970 let metrics = StorageMetrics::new();
971
972 metrics.record_append_success(1024);
973 assert_eq!(metrics.get_messages_total(), 1);
974 assert_eq!(metrics.get_disk_usage_bytes(), 1024);
975
976 metrics.record_delete_success(512, 1);
977 assert_eq!(metrics.get_messages_total(), 0);
978 assert_eq!(metrics.get_disk_usage_bytes(), 512);
979
980 metrics.record_append_failure();
981 let summary = metrics.get_summary();
982 assert_eq!(summary.append_errors, 1);
983 }
984
985 #[test]
986 fn test_prometheus_export() {
987 let metrics = StorageMetrics::new();
988
989 metrics.inc_messages_total(100);
990 metrics.add_disk_usage_bytes(1048576);
991 metrics.inc_mailboxes_total();
992 metrics.inc_append_operations();
993
994 let output = metrics.export_prometheus();
995
996 assert!(output.contains("rusmes_storage_messages_total 100"));
997 assert!(output.contains("rusmes_storage_disk_usage_bytes 1048576"));
998 assert!(output.contains("rusmes_storage_mailboxes_total 1"));
999 assert!(output.contains("rusmes_storage_append_operations_total 1"));
1000 assert!(output.contains("# HELP"));
1001 assert!(output.contains("# TYPE"));
1002 }
1003
1004 #[test]
1005 fn test_timer() {
1006 let metrics = StorageMetrics::new();
1007
1008 let timer = metrics.start_append_timer();
1009 std::thread::sleep(std::time::Duration::from_millis(10));
1010 timer.observe();
1011
1012 assert!(metrics.append_latency.get_count() > 0);
1013 assert!(metrics.append_latency.average() >= 0.01);
1014 }
1015}