1use prometheus::{
2 Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
3 Registry,
4};
5use serde::Serialize;
6use std::{
7 collections::HashMap,
8 sync::{
9 Arc,
10 atomic::{AtomicU64, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15pub struct MetricsRegistry {
17 registry: Registry,
19
20 pub events_ingested_total: IntCounter,
22 pub events_ingested_by_type: IntCounterVec,
23 pub ingestion_duration_seconds: Histogram,
24 pub ingestion_errors_total: IntCounter,
25
26 pub queries_total: IntCounterVec,
28 pub query_duration_seconds: HistogramVec,
29 pub query_results_total: IntCounterVec,
30
31 pub storage_events_total: IntGauge,
33 pub storage_entities_total: IntGauge,
34 pub storage_size_bytes: IntGauge,
35 pub parquet_files_total: IntGauge,
36 pub wal_segments_total: IntGauge,
37
38 pub projections_total: IntGauge,
40 pub projection_events_processed: IntCounterVec,
41 pub projection_errors_total: IntCounterVec,
42 pub projection_processing_duration: HistogramVec,
43 pub projection_duration_seconds: Histogram,
44
45 pub schemas_registered_total: IntCounter,
47 pub schema_validations_total: IntCounterVec,
48 pub schema_validation_duration: Histogram,
49
50 pub replays_started_total: IntCounter,
52 pub replays_completed_total: IntCounter,
53 pub replays_failed_total: IntCounter,
54 pub replay_events_processed: IntCounter,
55 pub replay_duration_seconds: Histogram,
56
57 pub pipelines_registered_total: IntGauge,
59 pub pipeline_events_processed: IntCounterVec,
60 pub pipeline_events_filtered: IntCounterVec,
61 pub pipeline_errors_total: IntCounterVec,
62 pub pipeline_processing_duration: HistogramVec,
63 pub pipeline_duration_seconds: Histogram,
64
65 pub snapshots_created_total: IntCounter,
67 pub snapshot_creation_duration: Histogram,
68 pub snapshots_total: IntGauge,
69
70 pub compactions_total: IntCounter,
72 pub compaction_duration_seconds: Histogram,
73 pub compaction_files_merged: IntCounter,
74 pub compaction_bytes_saved: IntCounter,
75
76 pub websocket_connections_active: IntGauge,
78 pub websocket_connections_total: IntCounter,
79 pub websocket_messages_sent: IntCounter,
80 pub websocket_errors_total: IntCounter,
81
82 pub http_requests_total: IntCounterVec,
84 pub http_request_duration_seconds: HistogramVec,
85 pub http_requests_in_flight: IntGauge,
86
87 pub replication_followers_connected: IntGauge,
89 pub replication_wal_shipped_total: IntCounter,
90 pub replication_wal_shipped_bytes_total: IntCounter,
91 pub replication_follower_lag_seconds: IntGaugeVec,
92 pub replication_acks_total: IntCounter,
93
94 pub replication_ack_wait_seconds: Histogram,
96
97 pub replication_wal_received_total: IntCounter,
99 pub replication_wal_replayed_total: IntCounter,
100 pub replication_lag_seconds: IntGauge,
101 pub replication_connected: IntGauge,
102 pub replication_reconnects_total: IntCounter,
103}
104
105impl MetricsRegistry {
106 pub fn new() -> Arc<Self> {
107 let registry = Registry::new();
108
109 let events_ingested_total = IntCounter::with_opts(Opts::new(
111 "allsource_events_ingested_total",
112 "Total number of events ingested",
113 ))
114 .unwrap();
115
116 let events_ingested_by_type = IntCounterVec::new(
117 Opts::new(
118 "allsource_events_ingested_by_type",
119 "Events ingested by type",
120 ),
121 &["event_type"],
122 )
123 .unwrap();
124
125 let ingestion_duration_seconds = Histogram::with_opts(HistogramOpts::new(
126 "allsource_ingestion_duration_seconds",
127 "Event ingestion duration in seconds",
128 ))
129 .unwrap();
130
131 let ingestion_errors_total = IntCounter::with_opts(Opts::new(
132 "allsource_ingestion_errors_total",
133 "Total number of ingestion errors",
134 ))
135 .unwrap();
136
137 let queries_total = IntCounterVec::new(
139 Opts::new("allsource_queries_total", "Total number of queries"),
140 &["query_type"],
141 )
142 .unwrap();
143
144 let query_duration_seconds = HistogramVec::new(
145 HistogramOpts::new(
146 "allsource_query_duration_seconds",
147 "Query duration in seconds",
148 ),
149 &["query_type"],
150 )
151 .unwrap();
152
153 let query_results_total = IntCounterVec::new(
154 Opts::new(
155 "allsource_query_results_total",
156 "Total number of events returned by queries",
157 ),
158 &["query_type"],
159 )
160 .unwrap();
161
162 let storage_events_total = IntGauge::with_opts(Opts::new(
164 "allsource_storage_events_total",
165 "Total number of events in storage",
166 ))
167 .unwrap();
168
169 let storage_entities_total = IntGauge::with_opts(Opts::new(
170 "allsource_storage_entities_total",
171 "Total number of entities in storage",
172 ))
173 .unwrap();
174
175 let storage_size_bytes = IntGauge::with_opts(Opts::new(
176 "allsource_storage_size_bytes",
177 "Total storage size in bytes",
178 ))
179 .unwrap();
180
181 let parquet_files_total = IntGauge::with_opts(Opts::new(
182 "allsource_parquet_files_total",
183 "Number of Parquet files",
184 ))
185 .unwrap();
186
187 let wal_segments_total = IntGauge::with_opts(Opts::new(
188 "allsource_wal_segments_total",
189 "Number of WAL segments",
190 ))
191 .unwrap();
192
193 let projection_events_processed = IntCounterVec::new(
195 Opts::new(
196 "allsource_projection_events_processed",
197 "Events processed by projections",
198 ),
199 &["projection_name"],
200 )
201 .unwrap();
202
203 let projection_errors_total = IntCounterVec::new(
204 Opts::new(
205 "allsource_projection_errors_total",
206 "Total projection errors",
207 ),
208 &["projection_name"],
209 )
210 .unwrap();
211
212 let projection_processing_duration = HistogramVec::new(
213 HistogramOpts::new(
214 "allsource_projection_processing_duration_seconds",
215 "Projection processing duration",
216 ),
217 &["projection_name"],
218 )
219 .unwrap();
220
221 let projections_total = IntGauge::with_opts(Opts::new(
222 "allsource_projections_total",
223 "Number of registered projections",
224 ))
225 .unwrap();
226
227 let projection_duration_seconds = Histogram::with_opts(HistogramOpts::new(
228 "allsource_projection_duration_seconds",
229 "Overall projection manager processing duration",
230 ))
231 .unwrap();
232
233 let schemas_registered_total = IntCounter::with_opts(Opts::new(
235 "allsource_schemas_registered_total",
236 "Total number of schemas registered",
237 ))
238 .unwrap();
239
240 let schema_validations_total = IntCounterVec::new(
241 Opts::new(
242 "allsource_schema_validations_total",
243 "Schema validations by result",
244 ),
245 &["subject", "result"],
246 )
247 .unwrap();
248
249 let schema_validation_duration = Histogram::with_opts(HistogramOpts::new(
250 "allsource_schema_validation_duration_seconds",
251 "Schema validation duration",
252 ))
253 .unwrap();
254
255 let replays_started_total = IntCounter::with_opts(Opts::new(
257 "allsource_replays_started_total",
258 "Total replays started",
259 ))
260 .unwrap();
261
262 let replays_completed_total = IntCounter::with_opts(Opts::new(
263 "allsource_replays_completed_total",
264 "Total replays completed",
265 ))
266 .unwrap();
267
268 let replays_failed_total = IntCounter::with_opts(Opts::new(
269 "allsource_replays_failed_total",
270 "Total replays failed",
271 ))
272 .unwrap();
273
274 let replay_events_processed = IntCounter::with_opts(Opts::new(
275 "allsource_replay_events_processed",
276 "Events processed during replays",
277 ))
278 .unwrap();
279
280 let replay_duration_seconds = Histogram::with_opts(HistogramOpts::new(
281 "allsource_replay_duration_seconds",
282 "Replay duration",
283 ))
284 .unwrap();
285
286 let pipelines_registered_total = IntGauge::with_opts(Opts::new(
288 "allsource_pipelines_registered_total",
289 "Number of registered pipelines",
290 ))
291 .unwrap();
292
293 let pipeline_events_processed = IntCounterVec::new(
294 Opts::new(
295 "allsource_pipeline_events_processed",
296 "Events processed by pipelines",
297 ),
298 &["pipeline_id", "pipeline_name"],
299 )
300 .unwrap();
301
302 let pipeline_events_filtered = IntCounterVec::new(
303 Opts::new(
304 "allsource_pipeline_events_filtered",
305 "Events filtered by pipelines",
306 ),
307 &["pipeline_id", "pipeline_name"],
308 )
309 .unwrap();
310
311 let pipeline_processing_duration = HistogramVec::new(
312 HistogramOpts::new(
313 "allsource_pipeline_processing_duration_seconds",
314 "Pipeline processing duration",
315 ),
316 &["pipeline_id", "pipeline_name"],
317 )
318 .unwrap();
319
320 let pipeline_errors_total = IntCounterVec::new(
321 Opts::new("allsource_pipeline_errors_total", "Total pipeline errors"),
322 &["pipeline_name"],
323 )
324 .unwrap();
325
326 let pipeline_duration_seconds = Histogram::with_opts(HistogramOpts::new(
327 "allsource_pipeline_duration_seconds",
328 "Overall pipeline manager processing duration",
329 ))
330 .unwrap();
331
332 let snapshots_created_total = IntCounter::with_opts(Opts::new(
334 "allsource_snapshots_created_total",
335 "Total snapshots created",
336 ))
337 .unwrap();
338
339 let snapshot_creation_duration = Histogram::with_opts(HistogramOpts::new(
340 "allsource_snapshot_creation_duration_seconds",
341 "Snapshot creation duration",
342 ))
343 .unwrap();
344
345 let snapshots_total = IntGauge::with_opts(Opts::new(
346 "allsource_snapshots_total",
347 "Total number of snapshots",
348 ))
349 .unwrap();
350
351 let compactions_total = IntCounter::with_opts(Opts::new(
353 "allsource_compactions_total",
354 "Total compactions performed",
355 ))
356 .unwrap();
357
358 let compaction_duration_seconds = Histogram::with_opts(HistogramOpts::new(
359 "allsource_compaction_duration_seconds",
360 "Compaction duration",
361 ))
362 .unwrap();
363
364 let compaction_files_merged = IntCounter::with_opts(Opts::new(
365 "allsource_compaction_files_merged",
366 "Files merged during compaction",
367 ))
368 .unwrap();
369
370 let compaction_bytes_saved = IntCounter::with_opts(Opts::new(
371 "allsource_compaction_bytes_saved",
372 "Bytes saved by compaction",
373 ))
374 .unwrap();
375
376 let websocket_connections_active = IntGauge::with_opts(Opts::new(
378 "allsource_websocket_connections_active",
379 "Active WebSocket connections",
380 ))
381 .unwrap();
382
383 let websocket_connections_total = IntCounter::with_opts(Opts::new(
384 "allsource_websocket_connections_total",
385 "Total WebSocket connections",
386 ))
387 .unwrap();
388
389 let websocket_messages_sent = IntCounter::with_opts(Opts::new(
390 "allsource_websocket_messages_sent",
391 "WebSocket messages sent",
392 ))
393 .unwrap();
394
395 let websocket_errors_total = IntCounter::with_opts(Opts::new(
396 "allsource_websocket_errors_total",
397 "WebSocket errors",
398 ))
399 .unwrap();
400
401 let http_requests_total = IntCounterVec::new(
403 Opts::new("allsource_http_requests_total", "Total HTTP requests"),
404 &["method", "endpoint", "status"],
405 )
406 .unwrap();
407
408 let http_request_duration_seconds = HistogramVec::new(
409 HistogramOpts::new(
410 "allsource_http_request_duration_seconds",
411 "HTTP request duration",
412 ),
413 &["method", "endpoint"],
414 )
415 .unwrap();
416
417 let http_requests_in_flight = IntGauge::with_opts(Opts::new(
418 "allsource_http_requests_in_flight",
419 "HTTP requests currently being processed",
420 ))
421 .unwrap();
422
423 let replication_followers_connected = IntGauge::with_opts(Opts::new(
425 "allsource_replication_followers_connected",
426 "Number of connected followers",
427 ))
428 .unwrap();
429
430 let replication_wal_shipped_total = IntCounter::with_opts(Opts::new(
431 "allsource_replication_wal_shipped_total",
432 "Total WAL entries shipped to followers",
433 ))
434 .unwrap();
435
436 let replication_wal_shipped_bytes_total = IntCounter::with_opts(Opts::new(
437 "allsource_replication_wal_shipped_bytes_total",
438 "Total bytes shipped to followers",
439 ))
440 .unwrap();
441
442 let replication_follower_lag_seconds = IntGaugeVec::new(
443 Opts::new(
444 "allsource_replication_follower_lag_seconds",
445 "Per-follower replication lag in seconds",
446 ),
447 &["follower_id"],
448 )
449 .unwrap();
450
451 let replication_acks_total = IntCounter::with_opts(Opts::new(
452 "allsource_replication_acks_total",
453 "Total ACKs received from followers",
454 ))
455 .unwrap();
456
457 let replication_ack_wait_seconds = Histogram::with_opts(
458 HistogramOpts::new(
459 "allsource_replication_ack_wait_seconds",
460 "Time spent waiting for follower ACKs in semi-sync/sync mode",
461 )
462 .buckets(vec![
463 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
464 ]),
465 )
466 .unwrap();
467
468 let replication_wal_received_total = IntCounter::with_opts(Opts::new(
470 "allsource_replication_wal_received_total",
471 "Total WAL entries received from leader",
472 ))
473 .unwrap();
474
475 let replication_wal_replayed_total = IntCounter::with_opts(Opts::new(
476 "allsource_replication_wal_replayed_total",
477 "Total WAL entries replayed into DashMap",
478 ))
479 .unwrap();
480
481 let replication_lag_seconds = IntGauge::with_opts(Opts::new(
482 "allsource_replication_lag_seconds",
483 "Replication lag behind leader in seconds",
484 ))
485 .unwrap();
486
487 let replication_connected = IntGauge::with_opts(Opts::new(
488 "allsource_replication_connected",
489 "Whether connected to leader (1=connected, 0=disconnected)",
490 ))
491 .unwrap();
492
493 let replication_reconnects_total = IntCounter::with_opts(Opts::new(
494 "allsource_replication_reconnects_total",
495 "Total reconnection attempts to leader",
496 ))
497 .unwrap();
498
499 registry
501 .register(Box::new(events_ingested_total.clone()))
502 .unwrap();
503 registry
504 .register(Box::new(events_ingested_by_type.clone()))
505 .unwrap();
506 registry
507 .register(Box::new(ingestion_duration_seconds.clone()))
508 .unwrap();
509 registry
510 .register(Box::new(ingestion_errors_total.clone()))
511 .unwrap();
512
513 registry.register(Box::new(queries_total.clone())).unwrap();
514 registry
515 .register(Box::new(query_duration_seconds.clone()))
516 .unwrap();
517 registry
518 .register(Box::new(query_results_total.clone()))
519 .unwrap();
520
521 registry
522 .register(Box::new(storage_events_total.clone()))
523 .unwrap();
524 registry
525 .register(Box::new(storage_entities_total.clone()))
526 .unwrap();
527 registry
528 .register(Box::new(storage_size_bytes.clone()))
529 .unwrap();
530 registry
531 .register(Box::new(parquet_files_total.clone()))
532 .unwrap();
533 registry
534 .register(Box::new(wal_segments_total.clone()))
535 .unwrap();
536
537 registry
538 .register(Box::new(projection_events_processed.clone()))
539 .unwrap();
540 registry
541 .register(Box::new(projection_errors_total.clone()))
542 .unwrap();
543 registry
544 .register(Box::new(projection_processing_duration.clone()))
545 .unwrap();
546 registry
547 .register(Box::new(projections_total.clone()))
548 .unwrap();
549 registry
550 .register(Box::new(projection_duration_seconds.clone()))
551 .unwrap();
552
553 registry
554 .register(Box::new(schemas_registered_total.clone()))
555 .unwrap();
556 registry
557 .register(Box::new(schema_validations_total.clone()))
558 .unwrap();
559 registry
560 .register(Box::new(schema_validation_duration.clone()))
561 .unwrap();
562
563 registry
564 .register(Box::new(replays_started_total.clone()))
565 .unwrap();
566 registry
567 .register(Box::new(replays_completed_total.clone()))
568 .unwrap();
569 registry
570 .register(Box::new(replays_failed_total.clone()))
571 .unwrap();
572 registry
573 .register(Box::new(replay_events_processed.clone()))
574 .unwrap();
575 registry
576 .register(Box::new(replay_duration_seconds.clone()))
577 .unwrap();
578
579 registry
580 .register(Box::new(pipelines_registered_total.clone()))
581 .unwrap();
582 registry
583 .register(Box::new(pipeline_events_processed.clone()))
584 .unwrap();
585 registry
586 .register(Box::new(pipeline_events_filtered.clone()))
587 .unwrap();
588 registry
589 .register(Box::new(pipeline_processing_duration.clone()))
590 .unwrap();
591 registry
592 .register(Box::new(pipeline_errors_total.clone()))
593 .unwrap();
594 registry
595 .register(Box::new(pipeline_duration_seconds.clone()))
596 .unwrap();
597
598 registry
599 .register(Box::new(snapshots_created_total.clone()))
600 .unwrap();
601 registry
602 .register(Box::new(snapshot_creation_duration.clone()))
603 .unwrap();
604 registry
605 .register(Box::new(snapshots_total.clone()))
606 .unwrap();
607
608 registry
609 .register(Box::new(compactions_total.clone()))
610 .unwrap();
611 registry
612 .register(Box::new(compaction_duration_seconds.clone()))
613 .unwrap();
614 registry
615 .register(Box::new(compaction_files_merged.clone()))
616 .unwrap();
617 registry
618 .register(Box::new(compaction_bytes_saved.clone()))
619 .unwrap();
620
621 registry
622 .register(Box::new(websocket_connections_active.clone()))
623 .unwrap();
624 registry
625 .register(Box::new(websocket_connections_total.clone()))
626 .unwrap();
627 registry
628 .register(Box::new(websocket_messages_sent.clone()))
629 .unwrap();
630 registry
631 .register(Box::new(websocket_errors_total.clone()))
632 .unwrap();
633
634 registry
635 .register(Box::new(http_requests_total.clone()))
636 .unwrap();
637 registry
638 .register(Box::new(http_request_duration_seconds.clone()))
639 .unwrap();
640 registry
641 .register(Box::new(http_requests_in_flight.clone()))
642 .unwrap();
643
644 registry
645 .register(Box::new(replication_followers_connected.clone()))
646 .unwrap();
647 registry
648 .register(Box::new(replication_wal_shipped_total.clone()))
649 .unwrap();
650 registry
651 .register(Box::new(replication_wal_shipped_bytes_total.clone()))
652 .unwrap();
653 registry
654 .register(Box::new(replication_follower_lag_seconds.clone()))
655 .unwrap();
656 registry
657 .register(Box::new(replication_acks_total.clone()))
658 .unwrap();
659 registry
660 .register(Box::new(replication_ack_wait_seconds.clone()))
661 .unwrap();
662 registry
663 .register(Box::new(replication_wal_received_total.clone()))
664 .unwrap();
665 registry
666 .register(Box::new(replication_wal_replayed_total.clone()))
667 .unwrap();
668 registry
669 .register(Box::new(replication_lag_seconds.clone()))
670 .unwrap();
671 registry
672 .register(Box::new(replication_connected.clone()))
673 .unwrap();
674 registry
675 .register(Box::new(replication_reconnects_total.clone()))
676 .unwrap();
677
678 Arc::new(Self {
679 registry,
680 events_ingested_total,
681 events_ingested_by_type,
682 ingestion_duration_seconds,
683 ingestion_errors_total,
684 queries_total,
685 query_duration_seconds,
686 query_results_total,
687 storage_events_total,
688 storage_entities_total,
689 storage_size_bytes,
690 parquet_files_total,
691 wal_segments_total,
692 projections_total,
693 projection_events_processed,
694 projection_errors_total,
695 projection_processing_duration,
696 projection_duration_seconds,
697 schemas_registered_total,
698 schema_validations_total,
699 schema_validation_duration,
700 replays_started_total,
701 replays_completed_total,
702 replays_failed_total,
703 replay_events_processed,
704 replay_duration_seconds,
705 pipelines_registered_total,
706 pipeline_events_processed,
707 pipeline_events_filtered,
708 pipeline_errors_total,
709 pipeline_processing_duration,
710 pipeline_duration_seconds,
711 snapshots_created_total,
712 snapshot_creation_duration,
713 snapshots_total,
714 compactions_total,
715 compaction_duration_seconds,
716 compaction_files_merged,
717 compaction_bytes_saved,
718 websocket_connections_active,
719 websocket_connections_total,
720 websocket_messages_sent,
721 websocket_errors_total,
722 http_requests_total,
723 http_request_duration_seconds,
724 http_requests_in_flight,
725 replication_followers_connected,
726 replication_wal_shipped_total,
727 replication_wal_shipped_bytes_total,
728 replication_follower_lag_seconds,
729 replication_acks_total,
730 replication_ack_wait_seconds,
731 replication_wal_received_total,
732 replication_wal_replayed_total,
733 replication_lag_seconds,
734 replication_connected,
735 replication_reconnects_total,
736 })
737 }
738
739 pub fn registry(&self) -> &Registry {
741 &self.registry
742 }
743
744 pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
746 use prometheus::Encoder;
747 let encoder = prometheus::TextEncoder::new();
748 let metric_families = self.registry.gather();
749 let mut buffer = Vec::new();
750 encoder.encode(&metric_families, &mut buffer)?;
751 Ok(String::from_utf8(buffer)?)
752 }
753}
754
755#[cfg(test)]
760mod tests {
761 use super::*;
762
763 #[test]
764 fn test_metrics_registry_creation() {
765 let metrics = MetricsRegistry::new();
766 assert_eq!(metrics.events_ingested_total.get(), 0);
767 assert_eq!(metrics.storage_events_total.get(), 0);
768 }
769
770 #[test]
771 fn test_event_ingestion_metrics() {
772 let metrics = MetricsRegistry::new();
773
774 metrics.events_ingested_total.inc();
776 assert_eq!(metrics.events_ingested_total.get(), 1);
777
778 metrics
780 .events_ingested_by_type
781 .with_label_values(&["user.created"])
782 .inc();
783 assert_eq!(
784 metrics
785 .events_ingested_by_type
786 .with_label_values(&["user.created"])
787 .get(),
788 1
789 );
790
791 metrics.ingestion_duration_seconds.observe(0.1);
793 }
794
795 #[test]
796 fn test_query_metrics() {
797 let metrics = MetricsRegistry::new();
798
799 metrics
801 .queries_total
802 .with_label_values(&["entity_id"])
803 .inc();
804 assert_eq!(
805 metrics
806 .queries_total
807 .with_label_values(&["entity_id"])
808 .get(),
809 1
810 );
811
812 metrics
814 .query_duration_seconds
815 .with_label_values(&["entity_id"])
816 .observe(0.05);
817
818 metrics
820 .query_results_total
821 .with_label_values(&["entity_id"])
822 .inc_by(10);
823 }
824
825 #[test]
826 fn test_storage_metrics() {
827 let metrics = MetricsRegistry::new();
828
829 metrics.storage_events_total.set(1000);
831 assert_eq!(metrics.storage_events_total.get(), 1000);
832
833 metrics.storage_entities_total.set(50);
834 assert_eq!(metrics.storage_entities_total.get(), 50);
835
836 metrics.storage_size_bytes.set(1024 * 1024);
837 assert_eq!(metrics.storage_size_bytes.get(), 1024 * 1024);
838
839 metrics.parquet_files_total.set(5);
840 metrics.wal_segments_total.set(3);
841 }
842
843 #[test]
844 fn test_projection_metrics() {
845 let metrics = MetricsRegistry::new();
846
847 metrics.projections_total.set(3);
849 assert_eq!(metrics.projections_total.get(), 3);
850
851 metrics
853 .projection_events_processed
854 .with_label_values(&["user_snapshot"])
855 .inc_by(100);
856
857 metrics
859 .projection_processing_duration
860 .with_label_values(&["user_snapshot"])
861 .observe(0.2);
862
863 metrics
865 .projection_errors_total
866 .with_label_values(&["user_snapshot"])
867 .inc();
868 }
869
870 #[test]
871 fn test_schema_metrics() {
872 let metrics = MetricsRegistry::new();
873
874 metrics.schemas_registered_total.inc();
876 assert_eq!(metrics.schemas_registered_total.get(), 1);
877
878 metrics
880 .schema_validations_total
881 .with_label_values(&["user.schema", "success"])
882 .inc();
883
884 metrics
886 .schema_validations_total
887 .with_label_values(&["order.schema", "failure"])
888 .inc();
889
890 metrics.schema_validation_duration.observe(0.01);
892 }
893
894 #[test]
895 fn test_replay_metrics() {
896 let metrics = MetricsRegistry::new();
897
898 metrics.replays_started_total.inc();
900 assert_eq!(metrics.replays_started_total.get(), 1);
901
902 metrics.replay_events_processed.inc_by(500);
904 assert_eq!(metrics.replay_events_processed.get(), 500);
905
906 metrics.replays_completed_total.inc();
908 assert_eq!(metrics.replays_completed_total.get(), 1);
909
910 metrics.replay_duration_seconds.observe(5.5);
912 }
913
914 #[test]
915 fn test_pipeline_metrics() {
916 let metrics = MetricsRegistry::new();
917
918 metrics.pipelines_registered_total.set(2);
920 assert_eq!(metrics.pipelines_registered_total.get(), 2);
921
922 metrics
924 .pipeline_events_processed
925 .with_label_values(&["pipeline-1", "filter_pipeline"])
926 .inc_by(250);
927
928 metrics
930 .pipeline_errors_total
931 .with_label_values(&["filter_pipeline"])
932 .inc();
933
934 metrics
936 .pipeline_processing_duration
937 .with_label_values(&["pipeline-1", "filter_pipeline"])
938 .observe(0.15);
939 }
940
941 #[test]
942 fn test_metrics_encode() {
943 let metrics = MetricsRegistry::new();
944
945 metrics.events_ingested_total.inc_by(100);
947 metrics.storage_events_total.set(1000);
948
949 let encoded = metrics.encode().unwrap();
951
952 assert!(encoded.contains("events_ingested_total"));
954 assert!(encoded.contains("storage_events_total"));
955 }
956
957 #[test]
958 fn test_metrics_default() {
959 let metrics = MetricsRegistry::new();
960 assert_eq!(metrics.events_ingested_total.get(), 0);
961 }
962
963 #[test]
964 fn test_websocket_metrics() {
965 let metrics = MetricsRegistry::new();
966
967 metrics.websocket_connections_active.inc();
969 assert_eq!(metrics.websocket_connections_active.get(), 1);
970
971 metrics.websocket_connections_total.inc();
973
974 metrics.websocket_messages_sent.inc_by(10);
976 assert_eq!(metrics.websocket_messages_sent.get(), 10);
977
978 metrics.websocket_connections_active.dec();
980 assert_eq!(metrics.websocket_connections_active.get(), 0);
981
982 metrics.websocket_errors_total.inc();
984 }
985
986 #[test]
987 fn test_compaction_metrics() {
988 let metrics = MetricsRegistry::new();
989
990 metrics.compactions_total.inc();
992 assert_eq!(metrics.compactions_total.get(), 1);
993
994 metrics.compaction_duration_seconds.observe(5.2);
996
997 metrics.compaction_files_merged.inc_by(5);
999
1000 metrics.compaction_bytes_saved.inc_by(1024 * 1024);
1002 }
1003
1004 #[test]
1005 fn test_snapshot_metrics() {
1006 let metrics = MetricsRegistry::new();
1007
1008 metrics.snapshots_created_total.inc();
1010 assert_eq!(metrics.snapshots_created_total.get(), 1);
1011
1012 metrics.snapshot_creation_duration.observe(0.5);
1014
1015 metrics.snapshots_total.set(10);
1017 assert_eq!(metrics.snapshots_total.get(), 10);
1018 }
1019
1020 #[test]
1021 fn test_replication_leader_metrics() {
1022 let metrics = MetricsRegistry::new();
1023
1024 metrics.replication_followers_connected.set(2);
1026 assert_eq!(metrics.replication_followers_connected.get(), 2);
1027
1028 metrics.replication_wal_shipped_total.inc_by(100);
1030 assert_eq!(metrics.replication_wal_shipped_total.get(), 100);
1031
1032 metrics
1034 .replication_wal_shipped_bytes_total
1035 .inc_by(1024 * 1024);
1036 assert_eq!(
1037 metrics.replication_wal_shipped_bytes_total.get(),
1038 1024 * 1024
1039 );
1040
1041 metrics
1043 .replication_follower_lag_seconds
1044 .with_label_values(&["follower-1"])
1045 .set(5);
1046 assert_eq!(
1047 metrics
1048 .replication_follower_lag_seconds
1049 .with_label_values(&["follower-1"])
1050 .get(),
1051 5
1052 );
1053
1054 metrics.replication_acks_total.inc_by(50);
1056 assert_eq!(metrics.replication_acks_total.get(), 50);
1057 }
1058
1059 #[test]
1060 fn test_replication_follower_metrics() {
1061 let metrics = MetricsRegistry::new();
1062
1063 metrics.replication_wal_received_total.inc_by(200);
1065 assert_eq!(metrics.replication_wal_received_total.get(), 200);
1066
1067 metrics.replication_wal_replayed_total.inc_by(195);
1069 assert_eq!(metrics.replication_wal_replayed_total.get(), 195);
1070
1071 metrics.replication_lag_seconds.set(3);
1073 assert_eq!(metrics.replication_lag_seconds.get(), 3);
1074
1075 metrics.replication_connected.set(1);
1077 assert_eq!(metrics.replication_connected.get(), 1);
1078
1079 metrics.replication_reconnects_total.inc_by(2);
1081 assert_eq!(metrics.replication_reconnects_total.get(), 2);
1082 }
1083
1084 #[test]
1085 fn test_replication_metrics_in_encode() {
1086 let metrics = MetricsRegistry::new();
1087
1088 metrics.replication_followers_connected.set(1);
1089 metrics.replication_wal_shipped_total.inc();
1090 metrics.replication_connected.set(1);
1091
1092 let encoded = metrics.encode().unwrap();
1093 assert!(encoded.contains("allsource_replication_followers_connected"));
1094 assert!(encoded.contains("allsource_replication_wal_shipped_total"));
1095 assert!(encoded.contains("allsource_replication_connected"));
1096 }
1097
1098 #[test]
1099 fn test_http_metrics() {
1100 let metrics = MetricsRegistry::new();
1101
1102 metrics
1104 .http_requests_total
1105 .with_label_values(&["GET", "/api/events", "200"])
1106 .inc();
1107
1108 metrics
1110 .http_request_duration_seconds
1111 .with_label_values(&["GET", "/api/events"])
1112 .observe(0.025);
1113
1114 metrics.http_requests_in_flight.inc();
1116 assert_eq!(metrics.http_requests_in_flight.get(), 1);
1117
1118 metrics.http_requests_in_flight.dec();
1119 assert_eq!(metrics.http_requests_in_flight.get(), 0);
1120 }
1121}
1122
1123#[derive(Debug)]
1132pub struct PartitionStats {
1133 pub partition_id: u32,
1135
1136 pub event_count: u64,
1138
1139 pub total_latency_ns: u64,
1141
1142 pub write_count: u64,
1144
1145 pub min_latency_ns: u64,
1147
1148 pub max_latency_ns: u64,
1150
1151 pub error_count: u64,
1153}
1154
1155impl PartitionStats {
1156 fn new(partition_id: u32) -> Self {
1157 Self {
1158 partition_id,
1159 event_count: 0,
1160 total_latency_ns: 0,
1161 write_count: 0,
1162 min_latency_ns: u64::MAX,
1163 max_latency_ns: 0,
1164 error_count: 0,
1165 }
1166 }
1167
1168 pub fn avg_latency(&self) -> Option<Duration> {
1170 self.total_latency_ns
1171 .checked_div(self.write_count)
1172 .map(Duration::from_nanos)
1173 }
1174}
1175
1176#[derive(Debug, Clone, Serialize)]
1178pub struct PartitionImbalanceAlert {
1179 pub partition_id: u32,
1181
1182 pub event_count: u64,
1184
1185 pub average_count: f64,
1187
1188 pub ratio_to_average: f64,
1190
1191 pub message: String,
1193
1194 pub timestamp: chrono::DateTime<chrono::Utc>,
1196}
1197
1198struct PartitionMetricsEntry {
1200 event_count: AtomicU64,
1201 total_latency_ns: AtomicU64,
1202 write_count: AtomicU64,
1203 min_latency_ns: AtomicU64,
1204 max_latency_ns: AtomicU64,
1205 error_count: AtomicU64,
1206}
1207
1208impl PartitionMetricsEntry {
1209 fn new() -> Self {
1210 Self {
1211 event_count: AtomicU64::new(0),
1212 total_latency_ns: AtomicU64::new(0),
1213 write_count: AtomicU64::new(0),
1214 min_latency_ns: AtomicU64::new(u64::MAX),
1215 max_latency_ns: AtomicU64::new(0),
1216 error_count: AtomicU64::new(0),
1217 }
1218 }
1219}
1220
1221pub struct PartitionMetrics {
1255 partition_count: u32,
1257
1258 partitions: Vec<PartitionMetricsEntry>,
1260
1261 partition_events_total: IntGaugeVec,
1263
1264 partition_write_latency: HistogramVec,
1266
1267 partition_errors_total: IntCounterVec,
1269
1270 registry: Registry,
1272
1273 started_at: Instant,
1275}
1276
1277impl PartitionMetrics {
1278 pub fn new(partition_count: u32) -> Self {
1283 let registry = Registry::new();
1284
1285 let partition_events_total = IntGaugeVec::new(
1287 Opts::new(
1288 "allsource_partition_events_total",
1289 "Total events per partition",
1290 ),
1291 &["partition_id"],
1292 )
1293 .expect("Failed to create partition_events_total metric");
1294
1295 let partition_write_latency = HistogramVec::new(
1297 HistogramOpts::new(
1298 "allsource_partition_write_latency_seconds",
1299 "Write latency per partition in seconds",
1300 )
1301 .buckets(vec![
1302 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
1303 ]),
1304 &["partition_id"],
1305 )
1306 .expect("Failed to create partition_write_latency metric");
1307
1308 let partition_errors_total = IntCounterVec::new(
1310 Opts::new(
1311 "allsource_partition_errors_total",
1312 "Total errors per partition",
1313 ),
1314 &["partition_id"],
1315 )
1316 .expect("Failed to create partition_errors_total metric");
1317
1318 registry
1320 .register(Box::new(partition_events_total.clone()))
1321 .expect("Failed to register partition_events_total");
1322 registry
1323 .register(Box::new(partition_write_latency.clone()))
1324 .expect("Failed to register partition_write_latency");
1325 registry
1326 .register(Box::new(partition_errors_total.clone()))
1327 .expect("Failed to register partition_errors_total");
1328
1329 let partitions = (0..partition_count)
1331 .map(|_| PartitionMetricsEntry::new())
1332 .collect();
1333
1334 Self {
1335 partition_count,
1336 partitions,
1337 partition_events_total,
1338 partition_write_latency,
1339 partition_errors_total,
1340 registry,
1341 started_at: Instant::now(),
1342 }
1343 }
1344
1345 pub fn with_default_partitions() -> Self {
1347 Self::new(32)
1348 }
1349
1350 #[inline]
1356 pub fn record_write(&self, partition_id: u32, latency: Duration) {
1357 if partition_id >= self.partition_count {
1358 return;
1359 }
1360
1361 let entry = &self.partitions[partition_id as usize];
1362 let latency_ns = latency.as_nanos() as u64;
1363
1364 entry.event_count.fetch_add(1, Ordering::Relaxed);
1366 entry.write_count.fetch_add(1, Ordering::Relaxed);
1367 entry
1368 .total_latency_ns
1369 .fetch_add(latency_ns, Ordering::Relaxed);
1370
1371 let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1373 while latency_ns < current_min {
1374 match entry.min_latency_ns.compare_exchange_weak(
1375 current_min,
1376 latency_ns,
1377 Ordering::Relaxed,
1378 Ordering::Relaxed,
1379 ) {
1380 Ok(_) => break,
1381 Err(actual) => current_min = actual,
1382 }
1383 }
1384
1385 let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1387 while latency_ns > current_max {
1388 match entry.max_latency_ns.compare_exchange_weak(
1389 current_max,
1390 latency_ns,
1391 Ordering::Relaxed,
1392 Ordering::Relaxed,
1393 ) {
1394 Ok(_) => break,
1395 Err(actual) => current_max = actual,
1396 }
1397 }
1398
1399 let partition_id_str = partition_id.to_string();
1401 self.partition_events_total
1402 .with_label_values(&[&partition_id_str])
1403 .set(entry.event_count.load(Ordering::Relaxed) as i64);
1404 self.partition_write_latency
1405 .with_label_values(&[&partition_id_str])
1406 .observe(latency.as_secs_f64());
1407 }
1408
1409 #[inline]
1414 pub fn record_error(&self, partition_id: u32) {
1415 if partition_id >= self.partition_count {
1416 return;
1417 }
1418
1419 let entry = &self.partitions[partition_id as usize];
1420 entry.error_count.fetch_add(1, Ordering::Relaxed);
1421
1422 let partition_id_str = partition_id.to_string();
1424 self.partition_errors_total
1425 .with_label_values(&[&partition_id_str])
1426 .inc();
1427 }
1428
1429 #[inline]
1436 pub fn record_batch_write(&self, partition_id: u32, count: u64, latency: Duration) {
1437 if partition_id >= self.partition_count {
1438 return;
1439 }
1440
1441 let entry = &self.partitions[partition_id as usize];
1442 let latency_ns = latency.as_nanos() as u64;
1443
1444 entry.event_count.fetch_add(count, Ordering::Relaxed);
1446 entry.write_count.fetch_add(1, Ordering::Relaxed);
1447 entry
1448 .total_latency_ns
1449 .fetch_add(latency_ns, Ordering::Relaxed);
1450
1451 let per_event_latency_ns = latency_ns / count.max(1);
1453
1454 let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1456 while per_event_latency_ns < current_min {
1457 match entry.min_latency_ns.compare_exchange_weak(
1458 current_min,
1459 per_event_latency_ns,
1460 Ordering::Relaxed,
1461 Ordering::Relaxed,
1462 ) {
1463 Ok(_) => break,
1464 Err(actual) => current_min = actual,
1465 }
1466 }
1467
1468 let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1470 while per_event_latency_ns > current_max {
1471 match entry.max_latency_ns.compare_exchange_weak(
1472 current_max,
1473 per_event_latency_ns,
1474 Ordering::Relaxed,
1475 Ordering::Relaxed,
1476 ) {
1477 Ok(_) => break,
1478 Err(actual) => current_max = actual,
1479 }
1480 }
1481
1482 let partition_id_str = partition_id.to_string();
1484 self.partition_events_total
1485 .with_label_values(&[&partition_id_str])
1486 .set(entry.event_count.load(Ordering::Relaxed) as i64);
1487 self.partition_write_latency
1488 .with_label_values(&[&partition_id_str])
1489 .observe(latency.as_secs_f64());
1490 }
1491
1492 pub fn get_partition_stats(&self, partition_id: u32) -> Option<PartitionStats> {
1494 if partition_id >= self.partition_count {
1495 return None;
1496 }
1497
1498 let entry = &self.partitions[partition_id as usize];
1499
1500 Some(PartitionStats {
1501 partition_id,
1502 event_count: entry.event_count.load(Ordering::Relaxed),
1503 total_latency_ns: entry.total_latency_ns.load(Ordering::Relaxed),
1504 write_count: entry.write_count.load(Ordering::Relaxed),
1505 min_latency_ns: entry.min_latency_ns.load(Ordering::Relaxed),
1506 max_latency_ns: entry.max_latency_ns.load(Ordering::Relaxed),
1507 error_count: entry.error_count.load(Ordering::Relaxed),
1508 })
1509 }
1510
1511 pub fn get_all_partition_stats(&self) -> Vec<PartitionStats> {
1513 (0..self.partition_count)
1514 .filter_map(|id| self.get_partition_stats(id))
1515 .collect()
1516 }
1517
1518 pub fn detect_partition_imbalance(&self) -> Vec<PartitionImbalanceAlert> {
1526 let mut alerts = Vec::new();
1527 let stats = self.get_all_partition_stats();
1528
1529 let total_events: u64 = stats.iter().map(|s| s.event_count).sum();
1531 let active_partitions = stats.iter().filter(|s| s.event_count > 0).count();
1532
1533 if active_partitions == 0 {
1534 return alerts;
1535 }
1536
1537 let average_count = total_events as f64 / active_partitions as f64;
1538 let imbalance_threshold = 2.0; for stat in stats {
1541 if stat.event_count == 0 {
1542 continue;
1543 }
1544
1545 let ratio = stat.event_count as f64 / average_count;
1546
1547 if ratio > imbalance_threshold {
1548 alerts.push(PartitionImbalanceAlert {
1549 partition_id: stat.partition_id,
1550 event_count: stat.event_count,
1551 average_count,
1552 ratio_to_average: ratio,
1553 message: format!(
1554 "Partition {} has {:.1}x average load ({} events vs {:.0} avg)",
1555 stat.partition_id, ratio, stat.event_count, average_count
1556 ),
1557 timestamp: chrono::Utc::now(),
1558 });
1559 }
1560 }
1561
1562 alerts
1563 }
1564
1565 pub fn partition_count(&self) -> u32 {
1567 self.partition_count
1568 }
1569
1570 pub fn total_events(&self) -> u64 {
1572 self.partitions
1573 .iter()
1574 .map(|e| e.event_count.load(Ordering::Relaxed))
1575 .sum()
1576 }
1577
1578 pub fn total_errors(&self) -> u64 {
1580 self.partitions
1581 .iter()
1582 .map(|e| e.error_count.load(Ordering::Relaxed))
1583 .sum()
1584 }
1585
1586 pub fn uptime(&self) -> Duration {
1588 self.started_at.elapsed()
1589 }
1590
1591 pub fn registry(&self) -> &Registry {
1593 &self.registry
1594 }
1595
1596 pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
1598 use prometheus::Encoder;
1599 let encoder = prometheus::TextEncoder::new();
1600 let metric_families = self.registry.gather();
1601 let mut buffer = Vec::new();
1602 encoder.encode(&metric_families, &mut buffer)?;
1603 Ok(String::from_utf8(buffer)?)
1604 }
1605
1606 pub fn get_distribution(&self) -> HashMap<u32, u64> {
1608 self.partitions
1609 .iter()
1610 .enumerate()
1611 .map(|(id, entry)| (id as u32, entry.event_count.load(Ordering::Relaxed)))
1612 .collect()
1613 }
1614
1615 pub fn reset(&self) {
1617 for entry in &self.partitions {
1618 entry.event_count.store(0, Ordering::Relaxed);
1619 entry.total_latency_ns.store(0, Ordering::Relaxed);
1620 entry.write_count.store(0, Ordering::Relaxed);
1621 entry.min_latency_ns.store(u64::MAX, Ordering::Relaxed);
1622 entry.max_latency_ns.store(0, Ordering::Relaxed);
1623 entry.error_count.store(0, Ordering::Relaxed);
1624 }
1625 }
1626}
1627
1628impl Default for PartitionMetrics {
1629 fn default() -> Self {
1630 Self::with_default_partitions()
1631 }
1632}
1633
1634#[cfg(test)]
1635mod partition_tests {
1636 use super::*;
1637 use std::thread;
1638
1639 #[test]
1640 fn test_partition_metrics_creation() {
1641 let metrics = PartitionMetrics::new(32);
1642 assert_eq!(metrics.partition_count(), 32);
1643 assert_eq!(metrics.total_events(), 0);
1644 assert_eq!(metrics.total_errors(), 0);
1645 }
1646
1647 #[test]
1648 fn test_partition_metrics_default() {
1649 let metrics = PartitionMetrics::default();
1650 assert_eq!(metrics.partition_count(), 32);
1651 }
1652
1653 #[test]
1654 fn test_record_write() {
1655 let metrics = PartitionMetrics::new(32);
1656
1657 metrics.record_write(0, Duration::from_micros(100));
1658 metrics.record_write(0, Duration::from_micros(200));
1659 metrics.record_write(1, Duration::from_micros(150));
1660
1661 let stats0 = metrics.get_partition_stats(0).unwrap();
1662 assert_eq!(stats0.event_count, 2);
1663 assert_eq!(stats0.write_count, 2);
1664
1665 let stats1 = metrics.get_partition_stats(1).unwrap();
1666 assert_eq!(stats1.event_count, 1);
1667 }
1668
1669 #[test]
1670 fn test_record_batch_write() {
1671 let metrics = PartitionMetrics::new(32);
1672
1673 metrics.record_batch_write(5, 100, Duration::from_millis(10));
1674
1675 let stats = metrics.get_partition_stats(5).unwrap();
1676 assert_eq!(stats.event_count, 100);
1677 assert_eq!(stats.write_count, 1);
1678 }
1679
1680 #[test]
1681 fn test_record_error() {
1682 let metrics = PartitionMetrics::new(32);
1683
1684 metrics.record_error(3);
1685 metrics.record_error(3);
1686 metrics.record_error(5);
1687
1688 let stats3 = metrics.get_partition_stats(3).unwrap();
1689 assert_eq!(stats3.error_count, 2);
1690
1691 let stats5 = metrics.get_partition_stats(5).unwrap();
1692 assert_eq!(stats5.error_count, 1);
1693
1694 assert_eq!(metrics.total_errors(), 3);
1695 }
1696
1697 #[test]
1698 fn test_invalid_partition_id() {
1699 let metrics = PartitionMetrics::new(32);
1700
1701 metrics.record_write(100, Duration::from_micros(100));
1703 metrics.record_error(100);
1704
1705 assert!(metrics.get_partition_stats(100).is_none());
1706 }
1707
1708 #[test]
1709 fn test_latency_tracking() {
1710 let metrics = PartitionMetrics::new(32);
1711
1712 metrics.record_write(0, Duration::from_micros(100));
1713 metrics.record_write(0, Duration::from_micros(200));
1714 metrics.record_write(0, Duration::from_micros(300));
1715
1716 let stats = metrics.get_partition_stats(0).unwrap();
1717 assert_eq!(stats.min_latency_ns, 100_000); assert_eq!(stats.max_latency_ns, 300_000); let avg = stats.avg_latency().unwrap();
1721 assert_eq!(avg, Duration::from_nanos(200_000)); }
1723
1724 #[test]
1725 fn test_detect_partition_imbalance_no_imbalance() {
1726 let metrics = PartitionMetrics::new(4);
1727
1728 for i in 0..4 {
1730 for _ in 0..100 {
1731 metrics.record_write(i, Duration::from_micros(100));
1732 }
1733 }
1734
1735 let alerts = metrics.detect_partition_imbalance();
1736 assert!(
1737 alerts.is_empty(),
1738 "No alerts expected for balanced partitions"
1739 );
1740 }
1741
1742 #[test]
1743 fn test_detect_partition_imbalance_hot_partition() {
1744 let metrics = PartitionMetrics::new(4);
1745
1746 for _ in 0..500 {
1750 metrics.record_write(0, Duration::from_micros(100));
1751 }
1752 for i in 1..4 {
1753 for _ in 0..100 {
1754 metrics.record_write(i, Duration::from_micros(100));
1755 }
1756 }
1757
1758 let alerts = metrics.detect_partition_imbalance();
1759 assert_eq!(alerts.len(), 1, "Expected one alert for hot partition");
1760 assert_eq!(alerts[0].partition_id, 0);
1761 assert!(alerts[0].ratio_to_average > 2.0);
1762 }
1763
1764 #[test]
1765 fn test_detect_partition_imbalance_empty() {
1766 let metrics = PartitionMetrics::new(4);
1767
1768 let alerts = metrics.detect_partition_imbalance();
1769 assert!(alerts.is_empty(), "No alerts expected for empty metrics");
1770 }
1771
1772 #[test]
1773 fn test_get_all_partition_stats() {
1774 let metrics = PartitionMetrics::new(4);
1775
1776 metrics.record_write(0, Duration::from_micros(100));
1777 metrics.record_write(2, Duration::from_micros(200));
1778
1779 let all_stats = metrics.get_all_partition_stats();
1780 assert_eq!(all_stats.len(), 4);
1781 assert_eq!(all_stats[0].event_count, 1);
1782 assert_eq!(all_stats[1].event_count, 0);
1783 assert_eq!(all_stats[2].event_count, 1);
1784 assert_eq!(all_stats[3].event_count, 0);
1785 }
1786
1787 #[test]
1788 fn test_prometheus_encoding() {
1789 let metrics = PartitionMetrics::new(4);
1790
1791 metrics.record_write(0, Duration::from_micros(100));
1792 metrics.record_write(1, Duration::from_micros(200));
1793 metrics.record_error(0);
1794
1795 let encoded = metrics.encode().unwrap();
1796
1797 assert!(encoded.contains("allsource_partition_events_total"));
1798 assert!(encoded.contains("allsource_partition_write_latency"));
1799 assert!(encoded.contains("allsource_partition_errors_total"));
1800 }
1801
1802 #[test]
1803 fn test_reset() {
1804 let metrics = PartitionMetrics::new(4);
1805
1806 metrics.record_write(0, Duration::from_micros(100));
1807 metrics.record_error(1);
1808
1809 assert_eq!(metrics.total_events(), 1);
1810 assert_eq!(metrics.total_errors(), 1);
1811
1812 metrics.reset();
1813
1814 assert_eq!(metrics.total_events(), 0);
1815 assert_eq!(metrics.total_errors(), 0);
1816 }
1817
1818 #[test]
1819 fn test_concurrent_writes() {
1820 let metrics = Arc::new(PartitionMetrics::new(32));
1821 let mut handles = vec![];
1822
1823 for _ in 0..8 {
1825 let metrics_clone = metrics.clone();
1826 let handle = thread::spawn(move || {
1827 for i in 0..1000 {
1828 let partition_id = (i % 32) as u32;
1829 metrics_clone.record_write(partition_id, Duration::from_micros(100));
1830 }
1831 });
1832 handles.push(handle);
1833 }
1834
1835 for handle in handles {
1836 handle.join().unwrap();
1837 }
1838
1839 assert_eq!(metrics.total_events(), 8000);
1840 }
1841
1842 #[test]
1843 fn test_get_distribution() {
1844 let metrics = PartitionMetrics::new(4);
1845
1846 metrics.record_write(0, Duration::from_micros(100));
1847 metrics.record_write(0, Duration::from_micros(100));
1848 metrics.record_write(2, Duration::from_micros(100));
1849
1850 let distribution = metrics.get_distribution();
1851
1852 assert_eq!(distribution.get(&0), Some(&2));
1853 assert_eq!(distribution.get(&1), Some(&0));
1854 assert_eq!(distribution.get(&2), Some(&1));
1855 assert_eq!(distribution.get(&3), Some(&0));
1856 }
1857
1858 #[test]
1859 fn test_partition_stats_avg_latency_none() {
1860 let stats = PartitionStats::new(0);
1861 assert!(stats.avg_latency().is_none());
1862 }
1863
1864 #[test]
1865 fn test_alert_message_format() {
1866 let metrics = PartitionMetrics::new(4);
1867
1868 for _ in 0..1000 {
1870 metrics.record_write(0, Duration::from_micros(100));
1871 }
1872 for i in 1..4 {
1873 for _ in 0..100 {
1874 metrics.record_write(i, Duration::from_micros(100));
1875 }
1876 }
1877
1878 let alerts = metrics.detect_partition_imbalance();
1879 assert!(!alerts.is_empty());
1880
1881 let alert = &alerts[0];
1882 assert!(alert.message.contains("Partition 0"));
1883 assert!(alert.message.contains("average load"));
1884 }
1885
1886 #[test]
1887 fn test_uptime() {
1888 let metrics = PartitionMetrics::new(4);
1889
1890 thread::sleep(Duration::from_millis(10));
1892
1893 let uptime = metrics.uptime();
1894 assert!(uptime.as_millis() >= 10);
1895 }
1896}