1use prometheus::{
2 Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
3 Registry,
4};
5use serde::Serialize;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11pub struct MetricsRegistry {
13 registry: Registry,
15
16 pub events_ingested_total: IntCounter,
18 pub events_ingested_by_type: IntCounterVec,
19 pub ingestion_duration_seconds: Histogram,
20 pub ingestion_errors_total: IntCounter,
21
22 pub queries_total: IntCounterVec,
24 pub query_duration_seconds: HistogramVec,
25 pub query_results_total: IntCounterVec,
26
27 pub storage_events_total: IntGauge,
29 pub storage_entities_total: IntGauge,
30 pub storage_size_bytes: IntGauge,
31 pub parquet_files_total: IntGauge,
32 pub wal_segments_total: IntGauge,
33
34 pub projections_total: IntGauge,
36 pub projection_events_processed: IntCounterVec,
37 pub projection_errors_total: IntCounterVec,
38 pub projection_processing_duration: HistogramVec,
39 pub projection_duration_seconds: Histogram,
40
41 pub schemas_registered_total: IntCounter,
43 pub schema_validations_total: IntCounterVec,
44 pub schema_validation_duration: Histogram,
45
46 pub replays_started_total: IntCounter,
48 pub replays_completed_total: IntCounter,
49 pub replays_failed_total: IntCounter,
50 pub replay_events_processed: IntCounter,
51 pub replay_duration_seconds: Histogram,
52
53 pub pipelines_registered_total: IntGauge,
55 pub pipeline_events_processed: IntCounterVec,
56 pub pipeline_events_filtered: IntCounterVec,
57 pub pipeline_errors_total: IntCounterVec,
58 pub pipeline_processing_duration: HistogramVec,
59 pub pipeline_duration_seconds: Histogram,
60
61 pub snapshots_created_total: IntCounter,
63 pub snapshot_creation_duration: Histogram,
64 pub snapshots_total: IntGauge,
65
66 pub compactions_total: IntCounter,
68 pub compaction_duration_seconds: Histogram,
69 pub compaction_files_merged: IntCounter,
70 pub compaction_bytes_saved: IntCounter,
71
72 pub websocket_connections_active: IntGauge,
74 pub websocket_connections_total: IntCounter,
75 pub websocket_messages_sent: IntCounter,
76 pub websocket_errors_total: IntCounter,
77
78 pub http_requests_total: IntCounterVec,
80 pub http_request_duration_seconds: HistogramVec,
81 pub http_requests_in_flight: IntGauge,
82}
83
84impl MetricsRegistry {
85 pub fn new() -> Arc<Self> {
86 let registry = Registry::new();
87
88 let events_ingested_total = IntCounter::with_opts(Opts::new(
90 "allsource_events_ingested_total",
91 "Total number of events ingested",
92 ))
93 .unwrap();
94
95 let events_ingested_by_type = IntCounterVec::new(
96 Opts::new(
97 "allsource_events_ingested_by_type",
98 "Events ingested by type",
99 ),
100 &["event_type"],
101 )
102 .unwrap();
103
104 let ingestion_duration_seconds = Histogram::with_opts(HistogramOpts::new(
105 "allsource_ingestion_duration_seconds",
106 "Event ingestion duration in seconds",
107 ))
108 .unwrap();
109
110 let ingestion_errors_total = IntCounter::with_opts(Opts::new(
111 "allsource_ingestion_errors_total",
112 "Total number of ingestion errors",
113 ))
114 .unwrap();
115
116 let queries_total = IntCounterVec::new(
118 Opts::new("allsource_queries_total", "Total number of queries"),
119 &["query_type"],
120 )
121 .unwrap();
122
123 let query_duration_seconds = HistogramVec::new(
124 HistogramOpts::new(
125 "allsource_query_duration_seconds",
126 "Query duration in seconds",
127 ),
128 &["query_type"],
129 )
130 .unwrap();
131
132 let query_results_total = IntCounterVec::new(
133 Opts::new(
134 "allsource_query_results_total",
135 "Total number of events returned by queries",
136 ),
137 &["query_type"],
138 )
139 .unwrap();
140
141 let storage_events_total = IntGauge::with_opts(Opts::new(
143 "allsource_storage_events_total",
144 "Total number of events in storage",
145 ))
146 .unwrap();
147
148 let storage_entities_total = IntGauge::with_opts(Opts::new(
149 "allsource_storage_entities_total",
150 "Total number of entities in storage",
151 ))
152 .unwrap();
153
154 let storage_size_bytes = IntGauge::with_opts(Opts::new(
155 "allsource_storage_size_bytes",
156 "Total storage size in bytes",
157 ))
158 .unwrap();
159
160 let parquet_files_total = IntGauge::with_opts(Opts::new(
161 "allsource_parquet_files_total",
162 "Number of Parquet files",
163 ))
164 .unwrap();
165
166 let wal_segments_total = IntGauge::with_opts(Opts::new(
167 "allsource_wal_segments_total",
168 "Number of WAL segments",
169 ))
170 .unwrap();
171
172 let projection_events_processed = IntCounterVec::new(
174 Opts::new(
175 "allsource_projection_events_processed",
176 "Events processed by projections",
177 ),
178 &["projection_name"],
179 )
180 .unwrap();
181
182 let projection_errors_total = IntCounterVec::new(
183 Opts::new(
184 "allsource_projection_errors_total",
185 "Total projection errors",
186 ),
187 &["projection_name"],
188 )
189 .unwrap();
190
191 let projection_processing_duration = HistogramVec::new(
192 HistogramOpts::new(
193 "allsource_projection_processing_duration_seconds",
194 "Projection processing duration",
195 ),
196 &["projection_name"],
197 )
198 .unwrap();
199
200 let projections_total = IntGauge::with_opts(Opts::new(
201 "allsource_projections_total",
202 "Number of registered projections",
203 ))
204 .unwrap();
205
206 let projection_duration_seconds = Histogram::with_opts(HistogramOpts::new(
207 "allsource_projection_duration_seconds",
208 "Overall projection manager processing duration",
209 ))
210 .unwrap();
211
212 let schemas_registered_total = IntCounter::with_opts(Opts::new(
214 "allsource_schemas_registered_total",
215 "Total number of schemas registered",
216 ))
217 .unwrap();
218
219 let schema_validations_total = IntCounterVec::new(
220 Opts::new(
221 "allsource_schema_validations_total",
222 "Schema validations by result",
223 ),
224 &["subject", "result"],
225 )
226 .unwrap();
227
228 let schema_validation_duration = Histogram::with_opts(HistogramOpts::new(
229 "allsource_schema_validation_duration_seconds",
230 "Schema validation duration",
231 ))
232 .unwrap();
233
234 let replays_started_total = IntCounter::with_opts(Opts::new(
236 "allsource_replays_started_total",
237 "Total replays started",
238 ))
239 .unwrap();
240
241 let replays_completed_total = IntCounter::with_opts(Opts::new(
242 "allsource_replays_completed_total",
243 "Total replays completed",
244 ))
245 .unwrap();
246
247 let replays_failed_total = IntCounter::with_opts(Opts::new(
248 "allsource_replays_failed_total",
249 "Total replays failed",
250 ))
251 .unwrap();
252
253 let replay_events_processed = IntCounter::with_opts(Opts::new(
254 "allsource_replay_events_processed",
255 "Events processed during replays",
256 ))
257 .unwrap();
258
259 let replay_duration_seconds = Histogram::with_opts(HistogramOpts::new(
260 "allsource_replay_duration_seconds",
261 "Replay duration",
262 ))
263 .unwrap();
264
265 let pipelines_registered_total = IntGauge::with_opts(Opts::new(
267 "allsource_pipelines_registered_total",
268 "Number of registered pipelines",
269 ))
270 .unwrap();
271
272 let pipeline_events_processed = IntCounterVec::new(
273 Opts::new(
274 "allsource_pipeline_events_processed",
275 "Events processed by pipelines",
276 ),
277 &["pipeline_id", "pipeline_name"],
278 )
279 .unwrap();
280
281 let pipeline_events_filtered = IntCounterVec::new(
282 Opts::new(
283 "allsource_pipeline_events_filtered",
284 "Events filtered by pipelines",
285 ),
286 &["pipeline_id", "pipeline_name"],
287 )
288 .unwrap();
289
290 let pipeline_processing_duration = HistogramVec::new(
291 HistogramOpts::new(
292 "allsource_pipeline_processing_duration_seconds",
293 "Pipeline processing duration",
294 ),
295 &["pipeline_id", "pipeline_name"],
296 )
297 .unwrap();
298
299 let pipeline_errors_total = IntCounterVec::new(
300 Opts::new("allsource_pipeline_errors_total", "Total pipeline errors"),
301 &["pipeline_name"],
302 )
303 .unwrap();
304
305 let pipeline_duration_seconds = Histogram::with_opts(HistogramOpts::new(
306 "allsource_pipeline_duration_seconds",
307 "Overall pipeline manager processing duration",
308 ))
309 .unwrap();
310
311 let snapshots_created_total = IntCounter::with_opts(Opts::new(
313 "allsource_snapshots_created_total",
314 "Total snapshots created",
315 ))
316 .unwrap();
317
318 let snapshot_creation_duration = Histogram::with_opts(HistogramOpts::new(
319 "allsource_snapshot_creation_duration_seconds",
320 "Snapshot creation duration",
321 ))
322 .unwrap();
323
324 let snapshots_total = IntGauge::with_opts(Opts::new(
325 "allsource_snapshots_total",
326 "Total number of snapshots",
327 ))
328 .unwrap();
329
330 let compactions_total = IntCounter::with_opts(Opts::new(
332 "allsource_compactions_total",
333 "Total compactions performed",
334 ))
335 .unwrap();
336
337 let compaction_duration_seconds = Histogram::with_opts(HistogramOpts::new(
338 "allsource_compaction_duration_seconds",
339 "Compaction duration",
340 ))
341 .unwrap();
342
343 let compaction_files_merged = IntCounter::with_opts(Opts::new(
344 "allsource_compaction_files_merged",
345 "Files merged during compaction",
346 ))
347 .unwrap();
348
349 let compaction_bytes_saved = IntCounter::with_opts(Opts::new(
350 "allsource_compaction_bytes_saved",
351 "Bytes saved by compaction",
352 ))
353 .unwrap();
354
355 let websocket_connections_active = IntGauge::with_opts(Opts::new(
357 "allsource_websocket_connections_active",
358 "Active WebSocket connections",
359 ))
360 .unwrap();
361
362 let websocket_connections_total = IntCounter::with_opts(Opts::new(
363 "allsource_websocket_connections_total",
364 "Total WebSocket connections",
365 ))
366 .unwrap();
367
368 let websocket_messages_sent = IntCounter::with_opts(Opts::new(
369 "allsource_websocket_messages_sent",
370 "WebSocket messages sent",
371 ))
372 .unwrap();
373
374 let websocket_errors_total = IntCounter::with_opts(Opts::new(
375 "allsource_websocket_errors_total",
376 "WebSocket errors",
377 ))
378 .unwrap();
379
380 let http_requests_total = IntCounterVec::new(
382 Opts::new("allsource_http_requests_total", "Total HTTP requests"),
383 &["method", "endpoint", "status"],
384 )
385 .unwrap();
386
387 let http_request_duration_seconds = HistogramVec::new(
388 HistogramOpts::new(
389 "allsource_http_request_duration_seconds",
390 "HTTP request duration",
391 ),
392 &["method", "endpoint"],
393 )
394 .unwrap();
395
396 let http_requests_in_flight = IntGauge::with_opts(Opts::new(
397 "allsource_http_requests_in_flight",
398 "HTTP requests currently being processed",
399 ))
400 .unwrap();
401
402 registry
404 .register(Box::new(events_ingested_total.clone()))
405 .unwrap();
406 registry
407 .register(Box::new(events_ingested_by_type.clone()))
408 .unwrap();
409 registry
410 .register(Box::new(ingestion_duration_seconds.clone()))
411 .unwrap();
412 registry
413 .register(Box::new(ingestion_errors_total.clone()))
414 .unwrap();
415
416 registry.register(Box::new(queries_total.clone())).unwrap();
417 registry
418 .register(Box::new(query_duration_seconds.clone()))
419 .unwrap();
420 registry
421 .register(Box::new(query_results_total.clone()))
422 .unwrap();
423
424 registry
425 .register(Box::new(storage_events_total.clone()))
426 .unwrap();
427 registry
428 .register(Box::new(storage_entities_total.clone()))
429 .unwrap();
430 registry
431 .register(Box::new(storage_size_bytes.clone()))
432 .unwrap();
433 registry
434 .register(Box::new(parquet_files_total.clone()))
435 .unwrap();
436 registry
437 .register(Box::new(wal_segments_total.clone()))
438 .unwrap();
439
440 registry
441 .register(Box::new(projection_events_processed.clone()))
442 .unwrap();
443 registry
444 .register(Box::new(projection_errors_total.clone()))
445 .unwrap();
446 registry
447 .register(Box::new(projection_processing_duration.clone()))
448 .unwrap();
449 registry
450 .register(Box::new(projections_total.clone()))
451 .unwrap();
452 registry
453 .register(Box::new(projection_duration_seconds.clone()))
454 .unwrap();
455
456 registry
457 .register(Box::new(schemas_registered_total.clone()))
458 .unwrap();
459 registry
460 .register(Box::new(schema_validations_total.clone()))
461 .unwrap();
462 registry
463 .register(Box::new(schema_validation_duration.clone()))
464 .unwrap();
465
466 registry
467 .register(Box::new(replays_started_total.clone()))
468 .unwrap();
469 registry
470 .register(Box::new(replays_completed_total.clone()))
471 .unwrap();
472 registry
473 .register(Box::new(replays_failed_total.clone()))
474 .unwrap();
475 registry
476 .register(Box::new(replay_events_processed.clone()))
477 .unwrap();
478 registry
479 .register(Box::new(replay_duration_seconds.clone()))
480 .unwrap();
481
482 registry
483 .register(Box::new(pipelines_registered_total.clone()))
484 .unwrap();
485 registry
486 .register(Box::new(pipeline_events_processed.clone()))
487 .unwrap();
488 registry
489 .register(Box::new(pipeline_events_filtered.clone()))
490 .unwrap();
491 registry
492 .register(Box::new(pipeline_processing_duration.clone()))
493 .unwrap();
494 registry
495 .register(Box::new(pipeline_errors_total.clone()))
496 .unwrap();
497 registry
498 .register(Box::new(pipeline_duration_seconds.clone()))
499 .unwrap();
500
501 registry
502 .register(Box::new(snapshots_created_total.clone()))
503 .unwrap();
504 registry
505 .register(Box::new(snapshot_creation_duration.clone()))
506 .unwrap();
507 registry
508 .register(Box::new(snapshots_total.clone()))
509 .unwrap();
510
511 registry
512 .register(Box::new(compactions_total.clone()))
513 .unwrap();
514 registry
515 .register(Box::new(compaction_duration_seconds.clone()))
516 .unwrap();
517 registry
518 .register(Box::new(compaction_files_merged.clone()))
519 .unwrap();
520 registry
521 .register(Box::new(compaction_bytes_saved.clone()))
522 .unwrap();
523
524 registry
525 .register(Box::new(websocket_connections_active.clone()))
526 .unwrap();
527 registry
528 .register(Box::new(websocket_connections_total.clone()))
529 .unwrap();
530 registry
531 .register(Box::new(websocket_messages_sent.clone()))
532 .unwrap();
533 registry
534 .register(Box::new(websocket_errors_total.clone()))
535 .unwrap();
536
537 registry
538 .register(Box::new(http_requests_total.clone()))
539 .unwrap();
540 registry
541 .register(Box::new(http_request_duration_seconds.clone()))
542 .unwrap();
543 registry
544 .register(Box::new(http_requests_in_flight.clone()))
545 .unwrap();
546
547 Arc::new(Self {
548 registry,
549 events_ingested_total,
550 events_ingested_by_type,
551 ingestion_duration_seconds,
552 ingestion_errors_total,
553 queries_total,
554 query_duration_seconds,
555 query_results_total,
556 storage_events_total,
557 storage_entities_total,
558 storage_size_bytes,
559 parquet_files_total,
560 wal_segments_total,
561 projection_events_processed,
562 projection_errors_total,
563 projection_processing_duration,
564 projections_total,
565 projection_duration_seconds,
566 schemas_registered_total,
567 schema_validations_total,
568 schema_validation_duration,
569 replays_started_total,
570 replays_completed_total,
571 replays_failed_total,
572 replay_events_processed,
573 replay_duration_seconds,
574 pipelines_registered_total,
575 pipeline_events_processed,
576 pipeline_events_filtered,
577 pipeline_processing_duration,
578 pipeline_errors_total,
579 pipeline_duration_seconds,
580 snapshots_created_total,
581 snapshot_creation_duration,
582 snapshots_total,
583 compactions_total,
584 compaction_duration_seconds,
585 compaction_files_merged,
586 compaction_bytes_saved,
587 websocket_connections_active,
588 websocket_connections_total,
589 websocket_messages_sent,
590 websocket_errors_total,
591 http_requests_total,
592 http_request_duration_seconds,
593 http_requests_in_flight,
594 })
595 }
596
597 pub fn registry(&self) -> &Registry {
599 &self.registry
600 }
601
602 pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
604 use prometheus::Encoder;
605 let encoder = prometheus::TextEncoder::new();
606 let metric_families = self.registry.gather();
607 let mut buffer = Vec::new();
608 encoder.encode(&metric_families, &mut buffer)?;
609 Ok(String::from_utf8(buffer)?)
610 }
611}
612
613#[cfg(test)]
618mod tests {
619 use super::*;
620
621 #[test]
622 fn test_metrics_registry_creation() {
623 let metrics = MetricsRegistry::new();
624 assert_eq!(metrics.events_ingested_total.get(), 0);
625 assert_eq!(metrics.storage_events_total.get(), 0);
626 }
627
628 #[test]
629 fn test_event_ingestion_metrics() {
630 let metrics = MetricsRegistry::new();
631
632 metrics.events_ingested_total.inc();
634 assert_eq!(metrics.events_ingested_total.get(), 1);
635
636 metrics
638 .events_ingested_by_type
639 .with_label_values(&["user.created"])
640 .inc();
641 assert_eq!(
642 metrics
643 .events_ingested_by_type
644 .with_label_values(&["user.created"])
645 .get(),
646 1
647 );
648
649 metrics.ingestion_duration_seconds.observe(0.1);
651 }
652
653 #[test]
654 fn test_query_metrics() {
655 let metrics = MetricsRegistry::new();
656
657 metrics
659 .queries_total
660 .with_label_values(&["entity_id"])
661 .inc();
662 assert_eq!(
663 metrics
664 .queries_total
665 .with_label_values(&["entity_id"])
666 .get(),
667 1
668 );
669
670 metrics
672 .query_duration_seconds
673 .with_label_values(&["entity_id"])
674 .observe(0.05);
675
676 metrics
678 .query_results_total
679 .with_label_values(&["entity_id"])
680 .inc_by(10);
681 }
682
683 #[test]
684 fn test_storage_metrics() {
685 let metrics = MetricsRegistry::new();
686
687 metrics.storage_events_total.set(1000);
689 assert_eq!(metrics.storage_events_total.get(), 1000);
690
691 metrics.storage_entities_total.set(50);
692 assert_eq!(metrics.storage_entities_total.get(), 50);
693
694 metrics.storage_size_bytes.set(1024 * 1024);
695 assert_eq!(metrics.storage_size_bytes.get(), 1024 * 1024);
696
697 metrics.parquet_files_total.set(5);
698 metrics.wal_segments_total.set(3);
699 }
700
701 #[test]
702 fn test_projection_metrics() {
703 let metrics = MetricsRegistry::new();
704
705 metrics.projections_total.set(3);
707 assert_eq!(metrics.projections_total.get(), 3);
708
709 metrics
711 .projection_events_processed
712 .with_label_values(&["user_snapshot"])
713 .inc_by(100);
714
715 metrics
717 .projection_processing_duration
718 .with_label_values(&["user_snapshot"])
719 .observe(0.2);
720
721 metrics
723 .projection_errors_total
724 .with_label_values(&["user_snapshot"])
725 .inc();
726 }
727
728 #[test]
729 fn test_schema_metrics() {
730 let metrics = MetricsRegistry::new();
731
732 metrics.schemas_registered_total.inc();
734 assert_eq!(metrics.schemas_registered_total.get(), 1);
735
736 metrics
738 .schema_validations_total
739 .with_label_values(&["user.schema", "success"])
740 .inc();
741
742 metrics
744 .schema_validations_total
745 .with_label_values(&["order.schema", "failure"])
746 .inc();
747
748 metrics.schema_validation_duration.observe(0.01);
750 }
751
752 #[test]
753 fn test_replay_metrics() {
754 let metrics = MetricsRegistry::new();
755
756 metrics.replays_started_total.inc();
758 assert_eq!(metrics.replays_started_total.get(), 1);
759
760 metrics.replay_events_processed.inc_by(500);
762 assert_eq!(metrics.replay_events_processed.get(), 500);
763
764 metrics.replays_completed_total.inc();
766 assert_eq!(metrics.replays_completed_total.get(), 1);
767
768 metrics.replay_duration_seconds.observe(5.5);
770 }
771
772 #[test]
773 fn test_pipeline_metrics() {
774 let metrics = MetricsRegistry::new();
775
776 metrics.pipelines_registered_total.set(2);
778 assert_eq!(metrics.pipelines_registered_total.get(), 2);
779
780 metrics
782 .pipeline_events_processed
783 .with_label_values(&["pipeline-1", "filter_pipeline"])
784 .inc_by(250);
785
786 metrics
788 .pipeline_errors_total
789 .with_label_values(&["filter_pipeline"])
790 .inc();
791
792 metrics
794 .pipeline_processing_duration
795 .with_label_values(&["pipeline-1", "filter_pipeline"])
796 .observe(0.15);
797 }
798
799 #[test]
800 fn test_metrics_encode() {
801 let metrics = MetricsRegistry::new();
802
803 metrics.events_ingested_total.inc_by(100);
805 metrics.storage_events_total.set(1000);
806
807 let encoded = metrics.encode().unwrap();
809
810 assert!(encoded.contains("events_ingested_total"));
812 assert!(encoded.contains("storage_events_total"));
813 }
814
815 #[test]
816 fn test_metrics_default() {
817 let metrics = MetricsRegistry::new();
818 assert_eq!(metrics.events_ingested_total.get(), 0);
819 }
820
821 #[test]
822 fn test_websocket_metrics() {
823 let metrics = MetricsRegistry::new();
824
825 metrics.websocket_connections_active.inc();
827 assert_eq!(metrics.websocket_connections_active.get(), 1);
828
829 metrics.websocket_connections_total.inc();
831
832 metrics.websocket_messages_sent.inc_by(10);
834 assert_eq!(metrics.websocket_messages_sent.get(), 10);
835
836 metrics.websocket_connections_active.dec();
838 assert_eq!(metrics.websocket_connections_active.get(), 0);
839
840 metrics.websocket_errors_total.inc();
842 }
843
844 #[test]
845 fn test_compaction_metrics() {
846 let metrics = MetricsRegistry::new();
847
848 metrics.compactions_total.inc();
850 assert_eq!(metrics.compactions_total.get(), 1);
851
852 metrics.compaction_duration_seconds.observe(5.2);
854
855 metrics.compaction_files_merged.inc_by(5);
857
858 metrics.compaction_bytes_saved.inc_by(1024 * 1024);
860 }
861
862 #[test]
863 fn test_snapshot_metrics() {
864 let metrics = MetricsRegistry::new();
865
866 metrics.snapshots_created_total.inc();
868 assert_eq!(metrics.snapshots_created_total.get(), 1);
869
870 metrics.snapshot_creation_duration.observe(0.5);
872
873 metrics.snapshots_total.set(10);
875 assert_eq!(metrics.snapshots_total.get(), 10);
876 }
877
878 #[test]
879 fn test_http_metrics() {
880 let metrics = MetricsRegistry::new();
881
882 metrics
884 .http_requests_total
885 .with_label_values(&["GET", "/api/events", "200"])
886 .inc();
887
888 metrics
890 .http_request_duration_seconds
891 .with_label_values(&["GET", "/api/events"])
892 .observe(0.025);
893
894 metrics.http_requests_in_flight.inc();
896 assert_eq!(metrics.http_requests_in_flight.get(), 1);
897
898 metrics.http_requests_in_flight.dec();
899 assert_eq!(metrics.http_requests_in_flight.get(), 0);
900 }
901}
902
903#[derive(Debug)]
912pub struct PartitionStats {
913 pub partition_id: u32,
915
916 pub event_count: u64,
918
919 pub total_latency_ns: u64,
921
922 pub write_count: u64,
924
925 pub min_latency_ns: u64,
927
928 pub max_latency_ns: u64,
930
931 pub error_count: u64,
933}
934
935impl PartitionStats {
936 fn new(partition_id: u32) -> Self {
937 Self {
938 partition_id,
939 event_count: 0,
940 total_latency_ns: 0,
941 write_count: 0,
942 min_latency_ns: u64::MAX,
943 max_latency_ns: 0,
944 error_count: 0,
945 }
946 }
947
948 pub fn avg_latency(&self) -> Option<Duration> {
950 if self.write_count == 0 {
951 None
952 } else {
953 Some(Duration::from_nanos(self.total_latency_ns / self.write_count))
954 }
955 }
956}
957
958#[derive(Debug, Clone, Serialize)]
960pub struct PartitionImbalanceAlert {
961 pub partition_id: u32,
963
964 pub event_count: u64,
966
967 pub average_count: f64,
969
970 pub ratio_to_average: f64,
972
973 pub message: String,
975
976 pub timestamp: chrono::DateTime<chrono::Utc>,
978}
979
980struct PartitionMetricsEntry {
982 event_count: AtomicU64,
983 total_latency_ns: AtomicU64,
984 write_count: AtomicU64,
985 min_latency_ns: AtomicU64,
986 max_latency_ns: AtomicU64,
987 error_count: AtomicU64,
988}
989
990impl PartitionMetricsEntry {
991 fn new() -> Self {
992 Self {
993 event_count: AtomicU64::new(0),
994 total_latency_ns: AtomicU64::new(0),
995 write_count: AtomicU64::new(0),
996 min_latency_ns: AtomicU64::new(u64::MAX),
997 max_latency_ns: AtomicU64::new(0),
998 error_count: AtomicU64::new(0),
999 }
1000 }
1001}
1002
1003pub struct PartitionMetrics {
1037 partition_count: u32,
1039
1040 partitions: Vec<PartitionMetricsEntry>,
1042
1043 partition_events_total: IntGaugeVec,
1045
1046 partition_write_latency: HistogramVec,
1048
1049 partition_errors_total: IntCounterVec,
1051
1052 registry: Registry,
1054
1055 started_at: Instant,
1057}
1058
1059impl PartitionMetrics {
1060 pub fn new(partition_count: u32) -> Self {
1065 let registry = Registry::new();
1066
1067 let partition_events_total = IntGaugeVec::new(
1069 Opts::new(
1070 "allsource_partition_events_total",
1071 "Total events per partition",
1072 ),
1073 &["partition_id"],
1074 )
1075 .expect("Failed to create partition_events_total metric");
1076
1077 let partition_write_latency = HistogramVec::new(
1079 HistogramOpts::new(
1080 "allsource_partition_write_latency_seconds",
1081 "Write latency per partition in seconds",
1082 )
1083 .buckets(vec![
1084 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
1085 ]),
1086 &["partition_id"],
1087 )
1088 .expect("Failed to create partition_write_latency metric");
1089
1090 let partition_errors_total = IntCounterVec::new(
1092 Opts::new(
1093 "allsource_partition_errors_total",
1094 "Total errors per partition",
1095 ),
1096 &["partition_id"],
1097 )
1098 .expect("Failed to create partition_errors_total metric");
1099
1100 registry
1102 .register(Box::new(partition_events_total.clone()))
1103 .expect("Failed to register partition_events_total");
1104 registry
1105 .register(Box::new(partition_write_latency.clone()))
1106 .expect("Failed to register partition_write_latency");
1107 registry
1108 .register(Box::new(partition_errors_total.clone()))
1109 .expect("Failed to register partition_errors_total");
1110
1111 let partitions = (0..partition_count)
1113 .map(|_| PartitionMetricsEntry::new())
1114 .collect();
1115
1116 Self {
1117 partition_count,
1118 partitions,
1119 partition_events_total,
1120 partition_write_latency,
1121 partition_errors_total,
1122 registry,
1123 started_at: Instant::now(),
1124 }
1125 }
1126
1127 pub fn with_default_partitions() -> Self {
1129 Self::new(32)
1130 }
1131
1132 #[inline]
1138 pub fn record_write(&self, partition_id: u32, latency: Duration) {
1139 if partition_id >= self.partition_count {
1140 return;
1141 }
1142
1143 let entry = &self.partitions[partition_id as usize];
1144 let latency_ns = latency.as_nanos() as u64;
1145
1146 entry.event_count.fetch_add(1, Ordering::Relaxed);
1148 entry.write_count.fetch_add(1, Ordering::Relaxed);
1149 entry.total_latency_ns.fetch_add(latency_ns, Ordering::Relaxed);
1150
1151 let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1153 while latency_ns < current_min {
1154 match entry.min_latency_ns.compare_exchange_weak(
1155 current_min,
1156 latency_ns,
1157 Ordering::Relaxed,
1158 Ordering::Relaxed,
1159 ) {
1160 Ok(_) => break,
1161 Err(actual) => current_min = actual,
1162 }
1163 }
1164
1165 let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1167 while latency_ns > current_max {
1168 match entry.max_latency_ns.compare_exchange_weak(
1169 current_max,
1170 latency_ns,
1171 Ordering::Relaxed,
1172 Ordering::Relaxed,
1173 ) {
1174 Ok(_) => break,
1175 Err(actual) => current_max = actual,
1176 }
1177 }
1178
1179 let partition_id_str = partition_id.to_string();
1181 self.partition_events_total
1182 .with_label_values(&[&partition_id_str])
1183 .set(entry.event_count.load(Ordering::Relaxed) as i64);
1184 self.partition_write_latency
1185 .with_label_values(&[&partition_id_str])
1186 .observe(latency.as_secs_f64());
1187 }
1188
1189 #[inline]
1194 pub fn record_error(&self, partition_id: u32) {
1195 if partition_id >= self.partition_count {
1196 return;
1197 }
1198
1199 let entry = &self.partitions[partition_id as usize];
1200 entry.error_count.fetch_add(1, Ordering::Relaxed);
1201
1202 let partition_id_str = partition_id.to_string();
1204 self.partition_errors_total
1205 .with_label_values(&[&partition_id_str])
1206 .inc();
1207 }
1208
1209 #[inline]
1216 pub fn record_batch_write(&self, partition_id: u32, count: u64, latency: Duration) {
1217 if partition_id >= self.partition_count {
1218 return;
1219 }
1220
1221 let entry = &self.partitions[partition_id as usize];
1222 let latency_ns = latency.as_nanos() as u64;
1223
1224 entry.event_count.fetch_add(count, Ordering::Relaxed);
1226 entry.write_count.fetch_add(1, Ordering::Relaxed);
1227 entry.total_latency_ns.fetch_add(latency_ns, Ordering::Relaxed);
1228
1229 let per_event_latency_ns = latency_ns / count.max(1);
1231
1232 let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1234 while per_event_latency_ns < current_min {
1235 match entry.min_latency_ns.compare_exchange_weak(
1236 current_min,
1237 per_event_latency_ns,
1238 Ordering::Relaxed,
1239 Ordering::Relaxed,
1240 ) {
1241 Ok(_) => break,
1242 Err(actual) => current_min = actual,
1243 }
1244 }
1245
1246 let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1248 while per_event_latency_ns > current_max {
1249 match entry.max_latency_ns.compare_exchange_weak(
1250 current_max,
1251 per_event_latency_ns,
1252 Ordering::Relaxed,
1253 Ordering::Relaxed,
1254 ) {
1255 Ok(_) => break,
1256 Err(actual) => current_max = actual,
1257 }
1258 }
1259
1260 let partition_id_str = partition_id.to_string();
1262 self.partition_events_total
1263 .with_label_values(&[&partition_id_str])
1264 .set(entry.event_count.load(Ordering::Relaxed) as i64);
1265 self.partition_write_latency
1266 .with_label_values(&[&partition_id_str])
1267 .observe(latency.as_secs_f64());
1268 }
1269
1270 pub fn get_partition_stats(&self, partition_id: u32) -> Option<PartitionStats> {
1272 if partition_id >= self.partition_count {
1273 return None;
1274 }
1275
1276 let entry = &self.partitions[partition_id as usize];
1277
1278 Some(PartitionStats {
1279 partition_id,
1280 event_count: entry.event_count.load(Ordering::Relaxed),
1281 total_latency_ns: entry.total_latency_ns.load(Ordering::Relaxed),
1282 write_count: entry.write_count.load(Ordering::Relaxed),
1283 min_latency_ns: entry.min_latency_ns.load(Ordering::Relaxed),
1284 max_latency_ns: entry.max_latency_ns.load(Ordering::Relaxed),
1285 error_count: entry.error_count.load(Ordering::Relaxed),
1286 })
1287 }
1288
1289 pub fn get_all_partition_stats(&self) -> Vec<PartitionStats> {
1291 (0..self.partition_count)
1292 .filter_map(|id| self.get_partition_stats(id))
1293 .collect()
1294 }
1295
1296 pub fn detect_partition_imbalance(&self) -> Vec<PartitionImbalanceAlert> {
1304 let mut alerts = Vec::new();
1305 let stats = self.get_all_partition_stats();
1306
1307 let total_events: u64 = stats.iter().map(|s| s.event_count).sum();
1309 let active_partitions = stats.iter().filter(|s| s.event_count > 0).count();
1310
1311 if active_partitions == 0 {
1312 return alerts;
1313 }
1314
1315 let average_count = total_events as f64 / active_partitions as f64;
1316 let imbalance_threshold = 2.0; for stat in stats {
1319 if stat.event_count == 0 {
1320 continue;
1321 }
1322
1323 let ratio = stat.event_count as f64 / average_count;
1324
1325 if ratio > imbalance_threshold {
1326 alerts.push(PartitionImbalanceAlert {
1327 partition_id: stat.partition_id,
1328 event_count: stat.event_count,
1329 average_count,
1330 ratio_to_average: ratio,
1331 message: format!(
1332 "Partition {} has {:.1}x average load ({} events vs {:.0} avg)",
1333 stat.partition_id, ratio, stat.event_count, average_count
1334 ),
1335 timestamp: chrono::Utc::now(),
1336 });
1337 }
1338 }
1339
1340 alerts
1341 }
1342
1343 pub fn partition_count(&self) -> u32 {
1345 self.partition_count
1346 }
1347
1348 pub fn total_events(&self) -> u64 {
1350 self.partitions
1351 .iter()
1352 .map(|e| e.event_count.load(Ordering::Relaxed))
1353 .sum()
1354 }
1355
1356 pub fn total_errors(&self) -> u64 {
1358 self.partitions
1359 .iter()
1360 .map(|e| e.error_count.load(Ordering::Relaxed))
1361 .sum()
1362 }
1363
1364 pub fn uptime(&self) -> Duration {
1366 self.started_at.elapsed()
1367 }
1368
1369 pub fn registry(&self) -> &Registry {
1371 &self.registry
1372 }
1373
1374 pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
1376 use prometheus::Encoder;
1377 let encoder = prometheus::TextEncoder::new();
1378 let metric_families = self.registry.gather();
1379 let mut buffer = Vec::new();
1380 encoder.encode(&metric_families, &mut buffer)?;
1381 Ok(String::from_utf8(buffer)?)
1382 }
1383
1384 pub fn get_distribution(&self) -> HashMap<u32, u64> {
1386 self.partitions
1387 .iter()
1388 .enumerate()
1389 .map(|(id, entry)| (id as u32, entry.event_count.load(Ordering::Relaxed)))
1390 .collect()
1391 }
1392
1393 pub fn reset(&self) {
1395 for entry in &self.partitions {
1396 entry.event_count.store(0, Ordering::Relaxed);
1397 entry.total_latency_ns.store(0, Ordering::Relaxed);
1398 entry.write_count.store(0, Ordering::Relaxed);
1399 entry.min_latency_ns.store(u64::MAX, Ordering::Relaxed);
1400 entry.max_latency_ns.store(0, Ordering::Relaxed);
1401 entry.error_count.store(0, Ordering::Relaxed);
1402 }
1403 }
1404}
1405
1406impl Default for PartitionMetrics {
1407 fn default() -> Self {
1408 Self::with_default_partitions()
1409 }
1410}
1411
1412#[cfg(test)]
1413mod partition_tests {
1414 use super::*;
1415 use std::thread;
1416
1417 #[test]
1418 fn test_partition_metrics_creation() {
1419 let metrics = PartitionMetrics::new(32);
1420 assert_eq!(metrics.partition_count(), 32);
1421 assert_eq!(metrics.total_events(), 0);
1422 assert_eq!(metrics.total_errors(), 0);
1423 }
1424
1425 #[test]
1426 fn test_partition_metrics_default() {
1427 let metrics = PartitionMetrics::default();
1428 assert_eq!(metrics.partition_count(), 32);
1429 }
1430
1431 #[test]
1432 fn test_record_write() {
1433 let metrics = PartitionMetrics::new(32);
1434
1435 metrics.record_write(0, Duration::from_micros(100));
1436 metrics.record_write(0, Duration::from_micros(200));
1437 metrics.record_write(1, Duration::from_micros(150));
1438
1439 let stats0 = metrics.get_partition_stats(0).unwrap();
1440 assert_eq!(stats0.event_count, 2);
1441 assert_eq!(stats0.write_count, 2);
1442
1443 let stats1 = metrics.get_partition_stats(1).unwrap();
1444 assert_eq!(stats1.event_count, 1);
1445 }
1446
1447 #[test]
1448 fn test_record_batch_write() {
1449 let metrics = PartitionMetrics::new(32);
1450
1451 metrics.record_batch_write(5, 100, Duration::from_millis(10));
1452
1453 let stats = metrics.get_partition_stats(5).unwrap();
1454 assert_eq!(stats.event_count, 100);
1455 assert_eq!(stats.write_count, 1);
1456 }
1457
1458 #[test]
1459 fn test_record_error() {
1460 let metrics = PartitionMetrics::new(32);
1461
1462 metrics.record_error(3);
1463 metrics.record_error(3);
1464 metrics.record_error(5);
1465
1466 let stats3 = metrics.get_partition_stats(3).unwrap();
1467 assert_eq!(stats3.error_count, 2);
1468
1469 let stats5 = metrics.get_partition_stats(5).unwrap();
1470 assert_eq!(stats5.error_count, 1);
1471
1472 assert_eq!(metrics.total_errors(), 3);
1473 }
1474
1475 #[test]
1476 fn test_invalid_partition_id() {
1477 let metrics = PartitionMetrics::new(32);
1478
1479 metrics.record_write(100, Duration::from_micros(100));
1481 metrics.record_error(100);
1482
1483 assert!(metrics.get_partition_stats(100).is_none());
1484 }
1485
1486 #[test]
1487 fn test_latency_tracking() {
1488 let metrics = PartitionMetrics::new(32);
1489
1490 metrics.record_write(0, Duration::from_micros(100));
1491 metrics.record_write(0, Duration::from_micros(200));
1492 metrics.record_write(0, Duration::from_micros(300));
1493
1494 let stats = metrics.get_partition_stats(0).unwrap();
1495 assert_eq!(stats.min_latency_ns, 100_000); assert_eq!(stats.max_latency_ns, 300_000); let avg = stats.avg_latency().unwrap();
1499 assert_eq!(avg, Duration::from_nanos(200_000)); }
1501
1502 #[test]
1503 fn test_detect_partition_imbalance_no_imbalance() {
1504 let metrics = PartitionMetrics::new(4);
1505
1506 for i in 0..4 {
1508 for _ in 0..100 {
1509 metrics.record_write(i, Duration::from_micros(100));
1510 }
1511 }
1512
1513 let alerts = metrics.detect_partition_imbalance();
1514 assert!(alerts.is_empty(), "No alerts expected for balanced partitions");
1515 }
1516
1517 #[test]
1518 fn test_detect_partition_imbalance_hot_partition() {
1519 let metrics = PartitionMetrics::new(4);
1520
1521 for _ in 0..500 {
1525 metrics.record_write(0, Duration::from_micros(100));
1526 }
1527 for i in 1..4 {
1528 for _ in 0..100 {
1529 metrics.record_write(i, Duration::from_micros(100));
1530 }
1531 }
1532
1533 let alerts = metrics.detect_partition_imbalance();
1534 assert_eq!(alerts.len(), 1, "Expected one alert for hot partition");
1535 assert_eq!(alerts[0].partition_id, 0);
1536 assert!(alerts[0].ratio_to_average > 2.0);
1537 }
1538
1539 #[test]
1540 fn test_detect_partition_imbalance_empty() {
1541 let metrics = PartitionMetrics::new(4);
1542
1543 let alerts = metrics.detect_partition_imbalance();
1544 assert!(alerts.is_empty(), "No alerts expected for empty metrics");
1545 }
1546
1547 #[test]
1548 fn test_get_all_partition_stats() {
1549 let metrics = PartitionMetrics::new(4);
1550
1551 metrics.record_write(0, Duration::from_micros(100));
1552 metrics.record_write(2, Duration::from_micros(200));
1553
1554 let all_stats = metrics.get_all_partition_stats();
1555 assert_eq!(all_stats.len(), 4);
1556 assert_eq!(all_stats[0].event_count, 1);
1557 assert_eq!(all_stats[1].event_count, 0);
1558 assert_eq!(all_stats[2].event_count, 1);
1559 assert_eq!(all_stats[3].event_count, 0);
1560 }
1561
1562 #[test]
1563 fn test_prometheus_encoding() {
1564 let metrics = PartitionMetrics::new(4);
1565
1566 metrics.record_write(0, Duration::from_micros(100));
1567 metrics.record_write(1, Duration::from_micros(200));
1568 metrics.record_error(0);
1569
1570 let encoded = metrics.encode().unwrap();
1571
1572 assert!(encoded.contains("allsource_partition_events_total"));
1573 assert!(encoded.contains("allsource_partition_write_latency"));
1574 assert!(encoded.contains("allsource_partition_errors_total"));
1575 }
1576
1577 #[test]
1578 fn test_reset() {
1579 let metrics = PartitionMetrics::new(4);
1580
1581 metrics.record_write(0, Duration::from_micros(100));
1582 metrics.record_error(1);
1583
1584 assert_eq!(metrics.total_events(), 1);
1585 assert_eq!(metrics.total_errors(), 1);
1586
1587 metrics.reset();
1588
1589 assert_eq!(metrics.total_events(), 0);
1590 assert_eq!(metrics.total_errors(), 0);
1591 }
1592
1593 #[test]
1594 fn test_concurrent_writes() {
1595 let metrics = Arc::new(PartitionMetrics::new(32));
1596 let mut handles = vec![];
1597
1598 for _ in 0..8 {
1600 let metrics_clone = metrics.clone();
1601 let handle = thread::spawn(move || {
1602 for i in 0..1000 {
1603 let partition_id = (i % 32) as u32;
1604 metrics_clone.record_write(partition_id, Duration::from_micros(100));
1605 }
1606 });
1607 handles.push(handle);
1608 }
1609
1610 for handle in handles {
1611 handle.join().unwrap();
1612 }
1613
1614 assert_eq!(metrics.total_events(), 8000);
1615 }
1616
1617 #[test]
1618 fn test_get_distribution() {
1619 let metrics = PartitionMetrics::new(4);
1620
1621 metrics.record_write(0, Duration::from_micros(100));
1622 metrics.record_write(0, Duration::from_micros(100));
1623 metrics.record_write(2, Duration::from_micros(100));
1624
1625 let distribution = metrics.get_distribution();
1626
1627 assert_eq!(distribution.get(&0), Some(&2));
1628 assert_eq!(distribution.get(&1), Some(&0));
1629 assert_eq!(distribution.get(&2), Some(&1));
1630 assert_eq!(distribution.get(&3), Some(&0));
1631 }
1632
1633 #[test]
1634 fn test_partition_stats_avg_latency_none() {
1635 let stats = PartitionStats::new(0);
1636 assert!(stats.avg_latency().is_none());
1637 }
1638
1639 #[test]
1640 fn test_alert_message_format() {
1641 let metrics = PartitionMetrics::new(4);
1642
1643 for _ in 0..1000 {
1645 metrics.record_write(0, Duration::from_micros(100));
1646 }
1647 for i in 1..4 {
1648 for _ in 0..100 {
1649 metrics.record_write(i, Duration::from_micros(100));
1650 }
1651 }
1652
1653 let alerts = metrics.detect_partition_imbalance();
1654 assert!(!alerts.is_empty());
1655
1656 let alert = &alerts[0];
1657 assert!(alert.message.contains("Partition 0"));
1658 assert!(alert.message.contains("average load"));
1659 }
1660
1661 #[test]
1662 fn test_uptime() {
1663 let metrics = PartitionMetrics::new(4);
1664
1665 thread::sleep(Duration::from_millis(10));
1667
1668 let uptime = metrics.uptime();
1669 assert!(uptime.as_millis() >= 10);
1670 }
1671}