1use prometheus::{
2 Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
3 Registry,
4};
5use serde::Serialize;
6use std::{
7 collections::HashMap,
8 sync::{
9 Arc,
10 atomic::{AtomicU64, Ordering},
11 },
12 time::{Duration, Instant},
13};
14
15pub struct MetricsRegistry {
17 registry: Registry,
19
20 pub events_ingested_total: IntCounter,
22 pub events_ingested_by_type: IntCounterVec,
23 pub ingestion_duration_seconds: Histogram,
24 pub ingestion_errors_total: IntCounter,
25
26 pub queries_total: IntCounterVec,
28 pub query_duration_seconds: HistogramVec,
29 pub query_results_total: IntCounterVec,
30
31 pub storage_events_total: IntGauge,
33 pub storage_entities_total: IntGauge,
34 pub storage_size_bytes: IntGauge,
35 pub parquet_files_total: IntGauge,
36 pub wal_segments_total: IntGauge,
37
38 pub projections_total: IntGauge,
40 pub projection_events_processed: IntCounterVec,
41 pub projection_errors_total: IntCounterVec,
42 pub projection_processing_duration: HistogramVec,
43 pub projection_duration_seconds: Histogram,
44
45 pub schemas_registered_total: IntCounter,
47 pub schema_validations_total: IntCounterVec,
48 pub schema_validation_duration: Histogram,
49
50 pub replays_started_total: IntCounter,
52 pub replays_completed_total: IntCounter,
53 pub replays_failed_total: IntCounter,
54 pub replay_events_processed: IntCounter,
55 pub replay_duration_seconds: Histogram,
56
57 pub pipelines_registered_total: IntGauge,
59 pub pipeline_events_processed: IntCounterVec,
60 pub pipeline_events_filtered: IntCounterVec,
61 pub pipeline_errors_total: IntCounterVec,
62 pub pipeline_processing_duration: HistogramVec,
63 pub pipeline_duration_seconds: Histogram,
64
65 pub snapshots_created_total: IntCounter,
67 pub snapshot_creation_duration: Histogram,
68 pub snapshots_total: IntGauge,
69
70 pub compactions_total: IntCounter,
72 pub compaction_duration_seconds: Histogram,
73 pub compaction_files_merged: IntCounter,
74 pub compaction_bytes_saved: IntCounter,
75
76 pub websocket_connections_active: IntGauge,
78 pub websocket_connections_total: IntCounter,
79 pub websocket_messages_sent: IntCounter,
80 pub websocket_errors_total: IntCounter,
81
82 pub http_requests_total: IntCounterVec,
84 pub http_request_duration_seconds: HistogramVec,
85 pub http_requests_in_flight: IntGauge,
86
87 pub replication_followers_connected: IntGauge,
89 pub replication_wal_shipped_total: IntCounter,
90 pub replication_wal_shipped_bytes_total: IntCounter,
91 pub replication_follower_lag_seconds: IntGaugeVec,
92 pub replication_acks_total: IntCounter,
93
94 pub replication_ack_wait_seconds: Histogram,
96
97 pub replication_wal_received_total: IntCounter,
99 pub replication_wal_replayed_total: IntCounter,
100 pub replication_lag_seconds: IntGauge,
101 pub replication_connected: IntGauge,
102 pub replication_reconnects_total: IntCounter,
103
104 pub cache_evictions_total: IntCounter,
110 pub cache_bytes: IntGauge,
111
112 pub wal_replay_events_total: IntGauge,
119}
120
121impl MetricsRegistry {
122 pub fn new() -> Arc<Self> {
123 let registry = Registry::new();
124
125 let events_ingested_total = IntCounter::with_opts(Opts::new(
127 "allsource_events_ingested_total",
128 "Total number of events ingested",
129 ))
130 .unwrap();
131
132 let events_ingested_by_type = IntCounterVec::new(
133 Opts::new(
134 "allsource_events_ingested_by_type",
135 "Events ingested by type",
136 ),
137 &["event_type"],
138 )
139 .unwrap();
140
141 let ingestion_duration_seconds = Histogram::with_opts(HistogramOpts::new(
142 "allsource_ingestion_duration_seconds",
143 "Event ingestion duration in seconds",
144 ))
145 .unwrap();
146
147 let ingestion_errors_total = IntCounter::with_opts(Opts::new(
148 "allsource_ingestion_errors_total",
149 "Total number of ingestion errors",
150 ))
151 .unwrap();
152
153 let queries_total = IntCounterVec::new(
155 Opts::new("allsource_queries_total", "Total number of queries"),
156 &["query_type"],
157 )
158 .unwrap();
159
160 let query_duration_seconds = HistogramVec::new(
161 HistogramOpts::new(
162 "allsource_query_duration_seconds",
163 "Query duration in seconds",
164 ),
165 &["query_type"],
166 )
167 .unwrap();
168
169 let query_results_total = IntCounterVec::new(
170 Opts::new(
171 "allsource_query_results_total",
172 "Total number of events returned by queries",
173 ),
174 &["query_type"],
175 )
176 .unwrap();
177
178 let storage_events_total = IntGauge::with_opts(Opts::new(
180 "allsource_storage_events_total",
181 "Total number of events in storage",
182 ))
183 .unwrap();
184
185 let storage_entities_total = IntGauge::with_opts(Opts::new(
186 "allsource_storage_entities_total",
187 "Total number of entities in storage",
188 ))
189 .unwrap();
190
191 let storage_size_bytes = IntGauge::with_opts(Opts::new(
192 "allsource_storage_size_bytes",
193 "Total storage size in bytes",
194 ))
195 .unwrap();
196
197 let parquet_files_total = IntGauge::with_opts(Opts::new(
198 "allsource_parquet_files_total",
199 "Number of Parquet files",
200 ))
201 .unwrap();
202
203 let wal_segments_total = IntGauge::with_opts(Opts::new(
204 "allsource_wal_segments_total",
205 "Number of WAL segments",
206 ))
207 .unwrap();
208
209 let projection_events_processed = IntCounterVec::new(
211 Opts::new(
212 "allsource_projection_events_processed",
213 "Events processed by projections",
214 ),
215 &["projection_name"],
216 )
217 .unwrap();
218
219 let projection_errors_total = IntCounterVec::new(
220 Opts::new(
221 "allsource_projection_errors_total",
222 "Total projection errors",
223 ),
224 &["projection_name"],
225 )
226 .unwrap();
227
228 let projection_processing_duration = HistogramVec::new(
229 HistogramOpts::new(
230 "allsource_projection_processing_duration_seconds",
231 "Projection processing duration",
232 ),
233 &["projection_name"],
234 )
235 .unwrap();
236
237 let projections_total = IntGauge::with_opts(Opts::new(
238 "allsource_projections_total",
239 "Number of registered projections",
240 ))
241 .unwrap();
242
243 let projection_duration_seconds = Histogram::with_opts(HistogramOpts::new(
244 "allsource_projection_duration_seconds",
245 "Overall projection manager processing duration",
246 ))
247 .unwrap();
248
249 let schemas_registered_total = IntCounter::with_opts(Opts::new(
251 "allsource_schemas_registered_total",
252 "Total number of schemas registered",
253 ))
254 .unwrap();
255
256 let schema_validations_total = IntCounterVec::new(
257 Opts::new(
258 "allsource_schema_validations_total",
259 "Schema validations by result",
260 ),
261 &["subject", "result"],
262 )
263 .unwrap();
264
265 let schema_validation_duration = Histogram::with_opts(HistogramOpts::new(
266 "allsource_schema_validation_duration_seconds",
267 "Schema validation duration",
268 ))
269 .unwrap();
270
271 let replays_started_total = IntCounter::with_opts(Opts::new(
273 "allsource_replays_started_total",
274 "Total replays started",
275 ))
276 .unwrap();
277
278 let replays_completed_total = IntCounter::with_opts(Opts::new(
279 "allsource_replays_completed_total",
280 "Total replays completed",
281 ))
282 .unwrap();
283
284 let replays_failed_total = IntCounter::with_opts(Opts::new(
285 "allsource_replays_failed_total",
286 "Total replays failed",
287 ))
288 .unwrap();
289
290 let replay_events_processed = IntCounter::with_opts(Opts::new(
291 "allsource_replay_events_processed",
292 "Events processed during replays",
293 ))
294 .unwrap();
295
296 let replay_duration_seconds = Histogram::with_opts(HistogramOpts::new(
297 "allsource_replay_duration_seconds",
298 "Replay duration",
299 ))
300 .unwrap();
301
302 let pipelines_registered_total = IntGauge::with_opts(Opts::new(
304 "allsource_pipelines_registered_total",
305 "Number of registered pipelines",
306 ))
307 .unwrap();
308
309 let pipeline_events_processed = IntCounterVec::new(
310 Opts::new(
311 "allsource_pipeline_events_processed",
312 "Events processed by pipelines",
313 ),
314 &["pipeline_id", "pipeline_name"],
315 )
316 .unwrap();
317
318 let pipeline_events_filtered = IntCounterVec::new(
319 Opts::new(
320 "allsource_pipeline_events_filtered",
321 "Events filtered by pipelines",
322 ),
323 &["pipeline_id", "pipeline_name"],
324 )
325 .unwrap();
326
327 let pipeline_processing_duration = HistogramVec::new(
328 HistogramOpts::new(
329 "allsource_pipeline_processing_duration_seconds",
330 "Pipeline processing duration",
331 ),
332 &["pipeline_id", "pipeline_name"],
333 )
334 .unwrap();
335
336 let pipeline_errors_total = IntCounterVec::new(
337 Opts::new("allsource_pipeline_errors_total", "Total pipeline errors"),
338 &["pipeline_name"],
339 )
340 .unwrap();
341
342 let pipeline_duration_seconds = Histogram::with_opts(HistogramOpts::new(
343 "allsource_pipeline_duration_seconds",
344 "Overall pipeline manager processing duration",
345 ))
346 .unwrap();
347
348 let snapshots_created_total = IntCounter::with_opts(Opts::new(
350 "allsource_snapshots_created_total",
351 "Total snapshots created",
352 ))
353 .unwrap();
354
355 let snapshot_creation_duration = Histogram::with_opts(HistogramOpts::new(
356 "allsource_snapshot_creation_duration_seconds",
357 "Snapshot creation duration",
358 ))
359 .unwrap();
360
361 let snapshots_total = IntGauge::with_opts(Opts::new(
362 "allsource_snapshots_total",
363 "Total number of snapshots",
364 ))
365 .unwrap();
366
367 let compactions_total = IntCounter::with_opts(Opts::new(
369 "allsource_compactions_total",
370 "Total compactions performed",
371 ))
372 .unwrap();
373
374 let compaction_duration_seconds = Histogram::with_opts(HistogramOpts::new(
375 "allsource_compaction_duration_seconds",
376 "Compaction duration",
377 ))
378 .unwrap();
379
380 let compaction_files_merged = IntCounter::with_opts(Opts::new(
381 "allsource_compaction_files_merged",
382 "Files merged during compaction",
383 ))
384 .unwrap();
385
386 let compaction_bytes_saved = IntCounter::with_opts(Opts::new(
387 "allsource_compaction_bytes_saved",
388 "Bytes saved by compaction",
389 ))
390 .unwrap();
391
392 let cache_evictions_total = IntCounter::with_opts(Opts::new(
394 "allsource_core_cache_evictions_total",
395 "Total tenant evictions from the in-memory cache",
396 ))
397 .unwrap();
398
399 let cache_bytes = IntGauge::with_opts(Opts::new(
400 "allsource_core_cache_bytes",
401 "Approximate resident bytes in the in-memory tenant cache",
402 ))
403 .unwrap();
404
405 let wal_replay_events_total = IntGauge::with_opts(Opts::new(
407 "allsource_core_wal_replay_events_total",
408 "Number of events replayed from the WAL on the most recent boot",
409 ))
410 .unwrap();
411
412 let websocket_connections_active = IntGauge::with_opts(Opts::new(
414 "allsource_websocket_connections_active",
415 "Active WebSocket connections",
416 ))
417 .unwrap();
418
419 let websocket_connections_total = IntCounter::with_opts(Opts::new(
420 "allsource_websocket_connections_total",
421 "Total WebSocket connections",
422 ))
423 .unwrap();
424
425 let websocket_messages_sent = IntCounter::with_opts(Opts::new(
426 "allsource_websocket_messages_sent",
427 "WebSocket messages sent",
428 ))
429 .unwrap();
430
431 let websocket_errors_total = IntCounter::with_opts(Opts::new(
432 "allsource_websocket_errors_total",
433 "WebSocket errors",
434 ))
435 .unwrap();
436
437 let http_requests_total = IntCounterVec::new(
439 Opts::new("allsource_http_requests_total", "Total HTTP requests"),
440 &["method", "endpoint", "status"],
441 )
442 .unwrap();
443
444 let http_request_duration_seconds = HistogramVec::new(
445 HistogramOpts::new(
446 "allsource_http_request_duration_seconds",
447 "HTTP request duration",
448 ),
449 &["method", "endpoint"],
450 )
451 .unwrap();
452
453 let http_requests_in_flight = IntGauge::with_opts(Opts::new(
454 "allsource_http_requests_in_flight",
455 "HTTP requests currently being processed",
456 ))
457 .unwrap();
458
459 let replication_followers_connected = IntGauge::with_opts(Opts::new(
461 "allsource_replication_followers_connected",
462 "Number of connected followers",
463 ))
464 .unwrap();
465
466 let replication_wal_shipped_total = IntCounter::with_opts(Opts::new(
467 "allsource_replication_wal_shipped_total",
468 "Total WAL entries shipped to followers",
469 ))
470 .unwrap();
471
472 let replication_wal_shipped_bytes_total = IntCounter::with_opts(Opts::new(
473 "allsource_replication_wal_shipped_bytes_total",
474 "Total bytes shipped to followers",
475 ))
476 .unwrap();
477
478 let replication_follower_lag_seconds = IntGaugeVec::new(
479 Opts::new(
480 "allsource_replication_follower_lag_seconds",
481 "Per-follower replication lag in seconds",
482 ),
483 &["follower_id"],
484 )
485 .unwrap();
486
487 let replication_acks_total = IntCounter::with_opts(Opts::new(
488 "allsource_replication_acks_total",
489 "Total ACKs received from followers",
490 ))
491 .unwrap();
492
493 let replication_ack_wait_seconds = Histogram::with_opts(
494 HistogramOpts::new(
495 "allsource_replication_ack_wait_seconds",
496 "Time spent waiting for follower ACKs in semi-sync/sync mode",
497 )
498 .buckets(vec![
499 0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
500 ]),
501 )
502 .unwrap();
503
504 let replication_wal_received_total = IntCounter::with_opts(Opts::new(
506 "allsource_replication_wal_received_total",
507 "Total WAL entries received from leader",
508 ))
509 .unwrap();
510
511 let replication_wal_replayed_total = IntCounter::with_opts(Opts::new(
512 "allsource_replication_wal_replayed_total",
513 "Total WAL entries replayed into DashMap",
514 ))
515 .unwrap();
516
517 let replication_lag_seconds = IntGauge::with_opts(Opts::new(
518 "allsource_replication_lag_seconds",
519 "Replication lag behind leader in seconds",
520 ))
521 .unwrap();
522
523 let replication_connected = IntGauge::with_opts(Opts::new(
524 "allsource_replication_connected",
525 "Whether connected to leader (1=connected, 0=disconnected)",
526 ))
527 .unwrap();
528
529 let replication_reconnects_total = IntCounter::with_opts(Opts::new(
530 "allsource_replication_reconnects_total",
531 "Total reconnection attempts to leader",
532 ))
533 .unwrap();
534
535 registry
537 .register(Box::new(events_ingested_total.clone()))
538 .unwrap();
539 registry
540 .register(Box::new(events_ingested_by_type.clone()))
541 .unwrap();
542 registry
543 .register(Box::new(ingestion_duration_seconds.clone()))
544 .unwrap();
545 registry
546 .register(Box::new(ingestion_errors_total.clone()))
547 .unwrap();
548
549 registry.register(Box::new(queries_total.clone())).unwrap();
550 registry
551 .register(Box::new(query_duration_seconds.clone()))
552 .unwrap();
553 registry
554 .register(Box::new(query_results_total.clone()))
555 .unwrap();
556
557 registry
558 .register(Box::new(storage_events_total.clone()))
559 .unwrap();
560 registry
561 .register(Box::new(storage_entities_total.clone()))
562 .unwrap();
563 registry
564 .register(Box::new(storage_size_bytes.clone()))
565 .unwrap();
566 registry
567 .register(Box::new(parquet_files_total.clone()))
568 .unwrap();
569 registry
570 .register(Box::new(wal_segments_total.clone()))
571 .unwrap();
572
573 registry
574 .register(Box::new(projection_events_processed.clone()))
575 .unwrap();
576 registry
577 .register(Box::new(projection_errors_total.clone()))
578 .unwrap();
579 registry
580 .register(Box::new(projection_processing_duration.clone()))
581 .unwrap();
582 registry
583 .register(Box::new(projections_total.clone()))
584 .unwrap();
585 registry
586 .register(Box::new(projection_duration_seconds.clone()))
587 .unwrap();
588
589 registry
590 .register(Box::new(schemas_registered_total.clone()))
591 .unwrap();
592 registry
593 .register(Box::new(schema_validations_total.clone()))
594 .unwrap();
595 registry
596 .register(Box::new(schema_validation_duration.clone()))
597 .unwrap();
598
599 registry
600 .register(Box::new(replays_started_total.clone()))
601 .unwrap();
602 registry
603 .register(Box::new(replays_completed_total.clone()))
604 .unwrap();
605 registry
606 .register(Box::new(replays_failed_total.clone()))
607 .unwrap();
608 registry
609 .register(Box::new(replay_events_processed.clone()))
610 .unwrap();
611 registry
612 .register(Box::new(replay_duration_seconds.clone()))
613 .unwrap();
614
615 registry
616 .register(Box::new(pipelines_registered_total.clone()))
617 .unwrap();
618 registry
619 .register(Box::new(pipeline_events_processed.clone()))
620 .unwrap();
621 registry
622 .register(Box::new(pipeline_events_filtered.clone()))
623 .unwrap();
624 registry
625 .register(Box::new(pipeline_processing_duration.clone()))
626 .unwrap();
627 registry
628 .register(Box::new(pipeline_errors_total.clone()))
629 .unwrap();
630 registry
631 .register(Box::new(pipeline_duration_seconds.clone()))
632 .unwrap();
633
634 registry
635 .register(Box::new(snapshots_created_total.clone()))
636 .unwrap();
637 registry
638 .register(Box::new(snapshot_creation_duration.clone()))
639 .unwrap();
640 registry
641 .register(Box::new(snapshots_total.clone()))
642 .unwrap();
643
644 registry
645 .register(Box::new(compactions_total.clone()))
646 .unwrap();
647 registry
648 .register(Box::new(compaction_duration_seconds.clone()))
649 .unwrap();
650 registry
651 .register(Box::new(compaction_files_merged.clone()))
652 .unwrap();
653 registry
654 .register(Box::new(compaction_bytes_saved.clone()))
655 .unwrap();
656
657 registry
658 .register(Box::new(cache_evictions_total.clone()))
659 .unwrap();
660 registry.register(Box::new(cache_bytes.clone())).unwrap();
661 registry
662 .register(Box::new(wal_replay_events_total.clone()))
663 .unwrap();
664
665 registry
666 .register(Box::new(websocket_connections_active.clone()))
667 .unwrap();
668 registry
669 .register(Box::new(websocket_connections_total.clone()))
670 .unwrap();
671 registry
672 .register(Box::new(websocket_messages_sent.clone()))
673 .unwrap();
674 registry
675 .register(Box::new(websocket_errors_total.clone()))
676 .unwrap();
677
678 registry
679 .register(Box::new(http_requests_total.clone()))
680 .unwrap();
681 registry
682 .register(Box::new(http_request_duration_seconds.clone()))
683 .unwrap();
684 registry
685 .register(Box::new(http_requests_in_flight.clone()))
686 .unwrap();
687
688 registry
689 .register(Box::new(replication_followers_connected.clone()))
690 .unwrap();
691 registry
692 .register(Box::new(replication_wal_shipped_total.clone()))
693 .unwrap();
694 registry
695 .register(Box::new(replication_wal_shipped_bytes_total.clone()))
696 .unwrap();
697 registry
698 .register(Box::new(replication_follower_lag_seconds.clone()))
699 .unwrap();
700 registry
701 .register(Box::new(replication_acks_total.clone()))
702 .unwrap();
703 registry
704 .register(Box::new(replication_ack_wait_seconds.clone()))
705 .unwrap();
706 registry
707 .register(Box::new(replication_wal_received_total.clone()))
708 .unwrap();
709 registry
710 .register(Box::new(replication_wal_replayed_total.clone()))
711 .unwrap();
712 registry
713 .register(Box::new(replication_lag_seconds.clone()))
714 .unwrap();
715 registry
716 .register(Box::new(replication_connected.clone()))
717 .unwrap();
718 registry
719 .register(Box::new(replication_reconnects_total.clone()))
720 .unwrap();
721
722 Arc::new(Self {
723 registry,
724 events_ingested_total,
725 events_ingested_by_type,
726 ingestion_duration_seconds,
727 ingestion_errors_total,
728 queries_total,
729 query_duration_seconds,
730 query_results_total,
731 storage_events_total,
732 storage_entities_total,
733 storage_size_bytes,
734 parquet_files_total,
735 wal_segments_total,
736 projections_total,
737 projection_events_processed,
738 projection_errors_total,
739 projection_processing_duration,
740 projection_duration_seconds,
741 schemas_registered_total,
742 schema_validations_total,
743 schema_validation_duration,
744 replays_started_total,
745 replays_completed_total,
746 replays_failed_total,
747 replay_events_processed,
748 replay_duration_seconds,
749 pipelines_registered_total,
750 pipeline_events_processed,
751 pipeline_events_filtered,
752 pipeline_errors_total,
753 pipeline_processing_duration,
754 pipeline_duration_seconds,
755 snapshots_created_total,
756 snapshot_creation_duration,
757 snapshots_total,
758 compactions_total,
759 compaction_duration_seconds,
760 compaction_files_merged,
761 compaction_bytes_saved,
762 websocket_connections_active,
763 websocket_connections_total,
764 websocket_messages_sent,
765 websocket_errors_total,
766 http_requests_total,
767 http_request_duration_seconds,
768 http_requests_in_flight,
769 replication_followers_connected,
770 replication_wal_shipped_total,
771 replication_wal_shipped_bytes_total,
772 replication_follower_lag_seconds,
773 replication_acks_total,
774 replication_ack_wait_seconds,
775 replication_wal_received_total,
776 replication_wal_replayed_total,
777 replication_lag_seconds,
778 replication_connected,
779 replication_reconnects_total,
780 cache_evictions_total,
781 cache_bytes,
782 wal_replay_events_total,
783 })
784 }
785
786 pub fn registry(&self) -> &Registry {
788 &self.registry
789 }
790
791 pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
793 use prometheus::Encoder;
794 let encoder = prometheus::TextEncoder::new();
795 let metric_families = self.registry.gather();
796 let mut buffer = Vec::new();
797 encoder.encode(&metric_families, &mut buffer)?;
798 Ok(String::from_utf8(buffer)?)
799 }
800}
801
802#[cfg(test)]
807mod tests {
808 use super::*;
809
810 #[test]
811 fn test_metrics_registry_creation() {
812 let metrics = MetricsRegistry::new();
813 assert_eq!(metrics.events_ingested_total.get(), 0);
814 assert_eq!(metrics.storage_events_total.get(), 0);
815 }
816
817 #[test]
818 fn test_event_ingestion_metrics() {
819 let metrics = MetricsRegistry::new();
820
821 metrics.events_ingested_total.inc();
823 assert_eq!(metrics.events_ingested_total.get(), 1);
824
825 metrics
827 .events_ingested_by_type
828 .with_label_values(&["user.created"])
829 .inc();
830 assert_eq!(
831 metrics
832 .events_ingested_by_type
833 .with_label_values(&["user.created"])
834 .get(),
835 1
836 );
837
838 metrics.ingestion_duration_seconds.observe(0.1);
840 }
841
842 #[test]
843 fn test_query_metrics() {
844 let metrics = MetricsRegistry::new();
845
846 metrics
848 .queries_total
849 .with_label_values(&["entity_id"])
850 .inc();
851 assert_eq!(
852 metrics
853 .queries_total
854 .with_label_values(&["entity_id"])
855 .get(),
856 1
857 );
858
859 metrics
861 .query_duration_seconds
862 .with_label_values(&["entity_id"])
863 .observe(0.05);
864
865 metrics
867 .query_results_total
868 .with_label_values(&["entity_id"])
869 .inc_by(10);
870 }
871
872 #[test]
873 fn test_storage_metrics() {
874 let metrics = MetricsRegistry::new();
875
876 metrics.storage_events_total.set(1000);
878 assert_eq!(metrics.storage_events_total.get(), 1000);
879
880 metrics.storage_entities_total.set(50);
881 assert_eq!(metrics.storage_entities_total.get(), 50);
882
883 metrics.storage_size_bytes.set(1024 * 1024);
884 assert_eq!(metrics.storage_size_bytes.get(), 1024 * 1024);
885
886 metrics.parquet_files_total.set(5);
887 metrics.wal_segments_total.set(3);
888 }
889
890 #[test]
891 fn test_projection_metrics() {
892 let metrics = MetricsRegistry::new();
893
894 metrics.projections_total.set(3);
896 assert_eq!(metrics.projections_total.get(), 3);
897
898 metrics
900 .projection_events_processed
901 .with_label_values(&["user_snapshot"])
902 .inc_by(100);
903
904 metrics
906 .projection_processing_duration
907 .with_label_values(&["user_snapshot"])
908 .observe(0.2);
909
910 metrics
912 .projection_errors_total
913 .with_label_values(&["user_snapshot"])
914 .inc();
915 }
916
917 #[test]
918 fn test_schema_metrics() {
919 let metrics = MetricsRegistry::new();
920
921 metrics.schemas_registered_total.inc();
923 assert_eq!(metrics.schemas_registered_total.get(), 1);
924
925 metrics
927 .schema_validations_total
928 .with_label_values(&["user.schema", "success"])
929 .inc();
930
931 metrics
933 .schema_validations_total
934 .with_label_values(&["order.schema", "failure"])
935 .inc();
936
937 metrics.schema_validation_duration.observe(0.01);
939 }
940
941 #[test]
942 fn test_replay_metrics() {
943 let metrics = MetricsRegistry::new();
944
945 metrics.replays_started_total.inc();
947 assert_eq!(metrics.replays_started_total.get(), 1);
948
949 metrics.replay_events_processed.inc_by(500);
951 assert_eq!(metrics.replay_events_processed.get(), 500);
952
953 metrics.replays_completed_total.inc();
955 assert_eq!(metrics.replays_completed_total.get(), 1);
956
957 metrics.replay_duration_seconds.observe(5.5);
959 }
960
961 #[test]
962 fn test_pipeline_metrics() {
963 let metrics = MetricsRegistry::new();
964
965 metrics.pipelines_registered_total.set(2);
967 assert_eq!(metrics.pipelines_registered_total.get(), 2);
968
969 metrics
971 .pipeline_events_processed
972 .with_label_values(&["pipeline-1", "filter_pipeline"])
973 .inc_by(250);
974
975 metrics
977 .pipeline_errors_total
978 .with_label_values(&["filter_pipeline"])
979 .inc();
980
981 metrics
983 .pipeline_processing_duration
984 .with_label_values(&["pipeline-1", "filter_pipeline"])
985 .observe(0.15);
986 }
987
988 #[test]
989 fn test_metrics_encode() {
990 let metrics = MetricsRegistry::new();
991
992 metrics.events_ingested_total.inc_by(100);
994 metrics.storage_events_total.set(1000);
995
996 let encoded = metrics.encode().unwrap();
998
999 assert!(encoded.contains("events_ingested_total"));
1001 assert!(encoded.contains("storage_events_total"));
1002 }
1003
1004 #[test]
1005 fn test_metrics_default() {
1006 let metrics = MetricsRegistry::new();
1007 assert_eq!(metrics.events_ingested_total.get(), 0);
1008 }
1009
1010 #[test]
1011 fn test_websocket_metrics() {
1012 let metrics = MetricsRegistry::new();
1013
1014 metrics.websocket_connections_active.inc();
1016 assert_eq!(metrics.websocket_connections_active.get(), 1);
1017
1018 metrics.websocket_connections_total.inc();
1020
1021 metrics.websocket_messages_sent.inc_by(10);
1023 assert_eq!(metrics.websocket_messages_sent.get(), 10);
1024
1025 metrics.websocket_connections_active.dec();
1027 assert_eq!(metrics.websocket_connections_active.get(), 0);
1028
1029 metrics.websocket_errors_total.inc();
1031 }
1032
1033 #[test]
1034 fn test_compaction_metrics() {
1035 let metrics = MetricsRegistry::new();
1036
1037 metrics.compactions_total.inc();
1039 assert_eq!(metrics.compactions_total.get(), 1);
1040
1041 metrics.compaction_duration_seconds.observe(5.2);
1043
1044 metrics.compaction_files_merged.inc_by(5);
1046
1047 metrics.compaction_bytes_saved.inc_by(1024 * 1024);
1049 }
1050
1051 #[test]
1052 fn test_snapshot_metrics() {
1053 let metrics = MetricsRegistry::new();
1054
1055 metrics.snapshots_created_total.inc();
1057 assert_eq!(metrics.snapshots_created_total.get(), 1);
1058
1059 metrics.snapshot_creation_duration.observe(0.5);
1061
1062 metrics.snapshots_total.set(10);
1064 assert_eq!(metrics.snapshots_total.get(), 10);
1065 }
1066
1067 #[test]
1068 fn test_replication_leader_metrics() {
1069 let metrics = MetricsRegistry::new();
1070
1071 metrics.replication_followers_connected.set(2);
1073 assert_eq!(metrics.replication_followers_connected.get(), 2);
1074
1075 metrics.replication_wal_shipped_total.inc_by(100);
1077 assert_eq!(metrics.replication_wal_shipped_total.get(), 100);
1078
1079 metrics
1081 .replication_wal_shipped_bytes_total
1082 .inc_by(1024 * 1024);
1083 assert_eq!(
1084 metrics.replication_wal_shipped_bytes_total.get(),
1085 1024 * 1024
1086 );
1087
1088 metrics
1090 .replication_follower_lag_seconds
1091 .with_label_values(&["follower-1"])
1092 .set(5);
1093 assert_eq!(
1094 metrics
1095 .replication_follower_lag_seconds
1096 .with_label_values(&["follower-1"])
1097 .get(),
1098 5
1099 );
1100
1101 metrics.replication_acks_total.inc_by(50);
1103 assert_eq!(metrics.replication_acks_total.get(), 50);
1104 }
1105
1106 #[test]
1107 fn test_replication_follower_metrics() {
1108 let metrics = MetricsRegistry::new();
1109
1110 metrics.replication_wal_received_total.inc_by(200);
1112 assert_eq!(metrics.replication_wal_received_total.get(), 200);
1113
1114 metrics.replication_wal_replayed_total.inc_by(195);
1116 assert_eq!(metrics.replication_wal_replayed_total.get(), 195);
1117
1118 metrics.replication_lag_seconds.set(3);
1120 assert_eq!(metrics.replication_lag_seconds.get(), 3);
1121
1122 metrics.replication_connected.set(1);
1124 assert_eq!(metrics.replication_connected.get(), 1);
1125
1126 metrics.replication_reconnects_total.inc_by(2);
1128 assert_eq!(metrics.replication_reconnects_total.get(), 2);
1129 }
1130
1131 #[test]
1132 fn test_replication_metrics_in_encode() {
1133 let metrics = MetricsRegistry::new();
1134
1135 metrics.replication_followers_connected.set(1);
1136 metrics.replication_wal_shipped_total.inc();
1137 metrics.replication_connected.set(1);
1138
1139 let encoded = metrics.encode().unwrap();
1140 assert!(encoded.contains("allsource_replication_followers_connected"));
1141 assert!(encoded.contains("allsource_replication_wal_shipped_total"));
1142 assert!(encoded.contains("allsource_replication_connected"));
1143 }
1144
1145 #[test]
1146 fn test_http_metrics() {
1147 let metrics = MetricsRegistry::new();
1148
1149 metrics
1151 .http_requests_total
1152 .with_label_values(&["GET", "/api/events", "200"])
1153 .inc();
1154
1155 metrics
1157 .http_request_duration_seconds
1158 .with_label_values(&["GET", "/api/events"])
1159 .observe(0.025);
1160
1161 metrics.http_requests_in_flight.inc();
1163 assert_eq!(metrics.http_requests_in_flight.get(), 1);
1164
1165 metrics.http_requests_in_flight.dec();
1166 assert_eq!(metrics.http_requests_in_flight.get(), 0);
1167 }
1168}
1169
1170#[derive(Debug)]
1179pub struct PartitionStats {
1180 pub partition_id: u32,
1182
1183 pub event_count: u64,
1185
1186 pub total_latency_ns: u64,
1188
1189 pub write_count: u64,
1191
1192 pub min_latency_ns: u64,
1194
1195 pub max_latency_ns: u64,
1197
1198 pub error_count: u64,
1200}
1201
1202impl PartitionStats {
1203 fn new(partition_id: u32) -> Self {
1204 Self {
1205 partition_id,
1206 event_count: 0,
1207 total_latency_ns: 0,
1208 write_count: 0,
1209 min_latency_ns: u64::MAX,
1210 max_latency_ns: 0,
1211 error_count: 0,
1212 }
1213 }
1214
1215 pub fn avg_latency(&self) -> Option<Duration> {
1217 self.total_latency_ns
1218 .checked_div(self.write_count)
1219 .map(Duration::from_nanos)
1220 }
1221}
1222
1223#[derive(Debug, Clone, Serialize)]
1225pub struct PartitionImbalanceAlert {
1226 pub partition_id: u32,
1228
1229 pub event_count: u64,
1231
1232 pub average_count: f64,
1234
1235 pub ratio_to_average: f64,
1237
1238 pub message: String,
1240
1241 pub timestamp: chrono::DateTime<chrono::Utc>,
1243}
1244
1245struct PartitionMetricsEntry {
1247 event_count: AtomicU64,
1248 total_latency_ns: AtomicU64,
1249 write_count: AtomicU64,
1250 min_latency_ns: AtomicU64,
1251 max_latency_ns: AtomicU64,
1252 error_count: AtomicU64,
1253}
1254
1255impl PartitionMetricsEntry {
1256 fn new() -> Self {
1257 Self {
1258 event_count: AtomicU64::new(0),
1259 total_latency_ns: AtomicU64::new(0),
1260 write_count: AtomicU64::new(0),
1261 min_latency_ns: AtomicU64::new(u64::MAX),
1262 max_latency_ns: AtomicU64::new(0),
1263 error_count: AtomicU64::new(0),
1264 }
1265 }
1266}
1267
1268pub struct PartitionMetrics {
1302 partition_count: u32,
1304
1305 partitions: Vec<PartitionMetricsEntry>,
1307
1308 partition_events_total: IntGaugeVec,
1310
1311 partition_write_latency: HistogramVec,
1313
1314 partition_errors_total: IntCounterVec,
1316
1317 registry: Registry,
1319
1320 started_at: Instant,
1322}
1323
1324impl PartitionMetrics {
1325 pub fn new(partition_count: u32) -> Self {
1330 let registry = Registry::new();
1331
1332 let partition_events_total = IntGaugeVec::new(
1334 Opts::new(
1335 "allsource_partition_events_total",
1336 "Total events per partition",
1337 ),
1338 &["partition_id"],
1339 )
1340 .expect("Failed to create partition_events_total metric");
1341
1342 let partition_write_latency = HistogramVec::new(
1344 HistogramOpts::new(
1345 "allsource_partition_write_latency_seconds",
1346 "Write latency per partition in seconds",
1347 )
1348 .buckets(vec![
1349 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
1350 ]),
1351 &["partition_id"],
1352 )
1353 .expect("Failed to create partition_write_latency metric");
1354
1355 let partition_errors_total = IntCounterVec::new(
1357 Opts::new(
1358 "allsource_partition_errors_total",
1359 "Total errors per partition",
1360 ),
1361 &["partition_id"],
1362 )
1363 .expect("Failed to create partition_errors_total metric");
1364
1365 registry
1367 .register(Box::new(partition_events_total.clone()))
1368 .expect("Failed to register partition_events_total");
1369 registry
1370 .register(Box::new(partition_write_latency.clone()))
1371 .expect("Failed to register partition_write_latency");
1372 registry
1373 .register(Box::new(partition_errors_total.clone()))
1374 .expect("Failed to register partition_errors_total");
1375
1376 let partitions = (0..partition_count)
1378 .map(|_| PartitionMetricsEntry::new())
1379 .collect();
1380
1381 Self {
1382 partition_count,
1383 partitions,
1384 partition_events_total,
1385 partition_write_latency,
1386 partition_errors_total,
1387 registry,
1388 started_at: Instant::now(),
1389 }
1390 }
1391
1392 pub fn with_default_partitions() -> Self {
1394 Self::new(32)
1395 }
1396
1397 #[inline]
1403 pub fn record_write(&self, partition_id: u32, latency: Duration) {
1404 if partition_id >= self.partition_count {
1405 return;
1406 }
1407
1408 let entry = &self.partitions[partition_id as usize];
1409 let latency_ns = latency.as_nanos() as u64;
1410
1411 entry.event_count.fetch_add(1, Ordering::Relaxed);
1413 entry.write_count.fetch_add(1, Ordering::Relaxed);
1414 entry
1415 .total_latency_ns
1416 .fetch_add(latency_ns, Ordering::Relaxed);
1417
1418 let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1420 while latency_ns < current_min {
1421 match entry.min_latency_ns.compare_exchange_weak(
1422 current_min,
1423 latency_ns,
1424 Ordering::Relaxed,
1425 Ordering::Relaxed,
1426 ) {
1427 Ok(_) => break,
1428 Err(actual) => current_min = actual,
1429 }
1430 }
1431
1432 let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1434 while latency_ns > current_max {
1435 match entry.max_latency_ns.compare_exchange_weak(
1436 current_max,
1437 latency_ns,
1438 Ordering::Relaxed,
1439 Ordering::Relaxed,
1440 ) {
1441 Ok(_) => break,
1442 Err(actual) => current_max = actual,
1443 }
1444 }
1445
1446 let partition_id_str = partition_id.to_string();
1448 self.partition_events_total
1449 .with_label_values(&[&partition_id_str])
1450 .set(entry.event_count.load(Ordering::Relaxed) as i64);
1451 self.partition_write_latency
1452 .with_label_values(&[&partition_id_str])
1453 .observe(latency.as_secs_f64());
1454 }
1455
1456 #[inline]
1461 pub fn record_error(&self, partition_id: u32) {
1462 if partition_id >= self.partition_count {
1463 return;
1464 }
1465
1466 let entry = &self.partitions[partition_id as usize];
1467 entry.error_count.fetch_add(1, Ordering::Relaxed);
1468
1469 let partition_id_str = partition_id.to_string();
1471 self.partition_errors_total
1472 .with_label_values(&[&partition_id_str])
1473 .inc();
1474 }
1475
1476 #[inline]
1483 pub fn record_batch_write(&self, partition_id: u32, count: u64, latency: Duration) {
1484 if partition_id >= self.partition_count {
1485 return;
1486 }
1487
1488 let entry = &self.partitions[partition_id as usize];
1489 let latency_ns = latency.as_nanos() as u64;
1490
1491 entry.event_count.fetch_add(count, Ordering::Relaxed);
1493 entry.write_count.fetch_add(1, Ordering::Relaxed);
1494 entry
1495 .total_latency_ns
1496 .fetch_add(latency_ns, Ordering::Relaxed);
1497
1498 let per_event_latency_ns = latency_ns / count.max(1);
1500
1501 let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1503 while per_event_latency_ns < current_min {
1504 match entry.min_latency_ns.compare_exchange_weak(
1505 current_min,
1506 per_event_latency_ns,
1507 Ordering::Relaxed,
1508 Ordering::Relaxed,
1509 ) {
1510 Ok(_) => break,
1511 Err(actual) => current_min = actual,
1512 }
1513 }
1514
1515 let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1517 while per_event_latency_ns > current_max {
1518 match entry.max_latency_ns.compare_exchange_weak(
1519 current_max,
1520 per_event_latency_ns,
1521 Ordering::Relaxed,
1522 Ordering::Relaxed,
1523 ) {
1524 Ok(_) => break,
1525 Err(actual) => current_max = actual,
1526 }
1527 }
1528
1529 let partition_id_str = partition_id.to_string();
1531 self.partition_events_total
1532 .with_label_values(&[&partition_id_str])
1533 .set(entry.event_count.load(Ordering::Relaxed) as i64);
1534 self.partition_write_latency
1535 .with_label_values(&[&partition_id_str])
1536 .observe(latency.as_secs_f64());
1537 }
1538
1539 pub fn get_partition_stats(&self, partition_id: u32) -> Option<PartitionStats> {
1541 if partition_id >= self.partition_count {
1542 return None;
1543 }
1544
1545 let entry = &self.partitions[partition_id as usize];
1546
1547 Some(PartitionStats {
1548 partition_id,
1549 event_count: entry.event_count.load(Ordering::Relaxed),
1550 total_latency_ns: entry.total_latency_ns.load(Ordering::Relaxed),
1551 write_count: entry.write_count.load(Ordering::Relaxed),
1552 min_latency_ns: entry.min_latency_ns.load(Ordering::Relaxed),
1553 max_latency_ns: entry.max_latency_ns.load(Ordering::Relaxed),
1554 error_count: entry.error_count.load(Ordering::Relaxed),
1555 })
1556 }
1557
1558 pub fn get_all_partition_stats(&self) -> Vec<PartitionStats> {
1560 (0..self.partition_count)
1561 .filter_map(|id| self.get_partition_stats(id))
1562 .collect()
1563 }
1564
1565 pub fn detect_partition_imbalance(&self) -> Vec<PartitionImbalanceAlert> {
1573 let mut alerts = Vec::new();
1574 let stats = self.get_all_partition_stats();
1575
1576 let total_events: u64 = stats.iter().map(|s| s.event_count).sum();
1578 let active_partitions = stats.iter().filter(|s| s.event_count > 0).count();
1579
1580 if active_partitions == 0 {
1581 return alerts;
1582 }
1583
1584 let average_count = total_events as f64 / active_partitions as f64;
1585 let imbalance_threshold = 2.0; for stat in stats {
1588 if stat.event_count == 0 {
1589 continue;
1590 }
1591
1592 let ratio = stat.event_count as f64 / average_count;
1593
1594 if ratio > imbalance_threshold {
1595 alerts.push(PartitionImbalanceAlert {
1596 partition_id: stat.partition_id,
1597 event_count: stat.event_count,
1598 average_count,
1599 ratio_to_average: ratio,
1600 message: format!(
1601 "Partition {} has {:.1}x average load ({} events vs {:.0} avg)",
1602 stat.partition_id, ratio, stat.event_count, average_count
1603 ),
1604 timestamp: chrono::Utc::now(),
1605 });
1606 }
1607 }
1608
1609 alerts
1610 }
1611
1612 pub fn partition_count(&self) -> u32 {
1614 self.partition_count
1615 }
1616
1617 pub fn total_events(&self) -> u64 {
1619 self.partitions
1620 .iter()
1621 .map(|e| e.event_count.load(Ordering::Relaxed))
1622 .sum()
1623 }
1624
1625 pub fn total_errors(&self) -> u64 {
1627 self.partitions
1628 .iter()
1629 .map(|e| e.error_count.load(Ordering::Relaxed))
1630 .sum()
1631 }
1632
1633 pub fn uptime(&self) -> Duration {
1635 self.started_at.elapsed()
1636 }
1637
1638 pub fn registry(&self) -> &Registry {
1640 &self.registry
1641 }
1642
1643 pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
1645 use prometheus::Encoder;
1646 let encoder = prometheus::TextEncoder::new();
1647 let metric_families = self.registry.gather();
1648 let mut buffer = Vec::new();
1649 encoder.encode(&metric_families, &mut buffer)?;
1650 Ok(String::from_utf8(buffer)?)
1651 }
1652
1653 pub fn get_distribution(&self) -> HashMap<u32, u64> {
1655 self.partitions
1656 .iter()
1657 .enumerate()
1658 .map(|(id, entry)| (id as u32, entry.event_count.load(Ordering::Relaxed)))
1659 .collect()
1660 }
1661
1662 pub fn reset(&self) {
1664 for entry in &self.partitions {
1665 entry.event_count.store(0, Ordering::Relaxed);
1666 entry.total_latency_ns.store(0, Ordering::Relaxed);
1667 entry.write_count.store(0, Ordering::Relaxed);
1668 entry.min_latency_ns.store(u64::MAX, Ordering::Relaxed);
1669 entry.max_latency_ns.store(0, Ordering::Relaxed);
1670 entry.error_count.store(0, Ordering::Relaxed);
1671 }
1672 }
1673}
1674
1675impl Default for PartitionMetrics {
1676 fn default() -> Self {
1677 Self::with_default_partitions()
1678 }
1679}
1680
1681#[cfg(test)]
1682mod partition_tests {
1683 use super::*;
1684 use std::thread;
1685
1686 #[test]
1687 fn test_partition_metrics_creation() {
1688 let metrics = PartitionMetrics::new(32);
1689 assert_eq!(metrics.partition_count(), 32);
1690 assert_eq!(metrics.total_events(), 0);
1691 assert_eq!(metrics.total_errors(), 0);
1692 }
1693
1694 #[test]
1695 fn test_partition_metrics_default() {
1696 let metrics = PartitionMetrics::default();
1697 assert_eq!(metrics.partition_count(), 32);
1698 }
1699
1700 #[test]
1701 fn test_record_write() {
1702 let metrics = PartitionMetrics::new(32);
1703
1704 metrics.record_write(0, Duration::from_micros(100));
1705 metrics.record_write(0, Duration::from_micros(200));
1706 metrics.record_write(1, Duration::from_micros(150));
1707
1708 let stats0 = metrics.get_partition_stats(0).unwrap();
1709 assert_eq!(stats0.event_count, 2);
1710 assert_eq!(stats0.write_count, 2);
1711
1712 let stats1 = metrics.get_partition_stats(1).unwrap();
1713 assert_eq!(stats1.event_count, 1);
1714 }
1715
1716 #[test]
1717 fn test_record_batch_write() {
1718 let metrics = PartitionMetrics::new(32);
1719
1720 metrics.record_batch_write(5, 100, Duration::from_millis(10));
1721
1722 let stats = metrics.get_partition_stats(5).unwrap();
1723 assert_eq!(stats.event_count, 100);
1724 assert_eq!(stats.write_count, 1);
1725 }
1726
1727 #[test]
1728 fn test_record_error() {
1729 let metrics = PartitionMetrics::new(32);
1730
1731 metrics.record_error(3);
1732 metrics.record_error(3);
1733 metrics.record_error(5);
1734
1735 let stats3 = metrics.get_partition_stats(3).unwrap();
1736 assert_eq!(stats3.error_count, 2);
1737
1738 let stats5 = metrics.get_partition_stats(5).unwrap();
1739 assert_eq!(stats5.error_count, 1);
1740
1741 assert_eq!(metrics.total_errors(), 3);
1742 }
1743
1744 #[test]
1745 fn test_invalid_partition_id() {
1746 let metrics = PartitionMetrics::new(32);
1747
1748 metrics.record_write(100, Duration::from_micros(100));
1750 metrics.record_error(100);
1751
1752 assert!(metrics.get_partition_stats(100).is_none());
1753 }
1754
1755 #[test]
1756 fn test_latency_tracking() {
1757 let metrics = PartitionMetrics::new(32);
1758
1759 metrics.record_write(0, Duration::from_micros(100));
1760 metrics.record_write(0, Duration::from_micros(200));
1761 metrics.record_write(0, Duration::from_micros(300));
1762
1763 let stats = metrics.get_partition_stats(0).unwrap();
1764 assert_eq!(stats.min_latency_ns, 100_000); assert_eq!(stats.max_latency_ns, 300_000); let avg = stats.avg_latency().unwrap();
1768 assert_eq!(avg, Duration::from_nanos(200_000)); }
1770
1771 #[test]
1772 fn test_detect_partition_imbalance_no_imbalance() {
1773 let metrics = PartitionMetrics::new(4);
1774
1775 for i in 0..4 {
1777 for _ in 0..100 {
1778 metrics.record_write(i, Duration::from_micros(100));
1779 }
1780 }
1781
1782 let alerts = metrics.detect_partition_imbalance();
1783 assert!(
1784 alerts.is_empty(),
1785 "No alerts expected for balanced partitions"
1786 );
1787 }
1788
1789 #[test]
1790 fn test_detect_partition_imbalance_hot_partition() {
1791 let metrics = PartitionMetrics::new(4);
1792
1793 for _ in 0..500 {
1797 metrics.record_write(0, Duration::from_micros(100));
1798 }
1799 for i in 1..4 {
1800 for _ in 0..100 {
1801 metrics.record_write(i, Duration::from_micros(100));
1802 }
1803 }
1804
1805 let alerts = metrics.detect_partition_imbalance();
1806 assert_eq!(alerts.len(), 1, "Expected one alert for hot partition");
1807 assert_eq!(alerts[0].partition_id, 0);
1808 assert!(alerts[0].ratio_to_average > 2.0);
1809 }
1810
1811 #[test]
1812 fn test_detect_partition_imbalance_empty() {
1813 let metrics = PartitionMetrics::new(4);
1814
1815 let alerts = metrics.detect_partition_imbalance();
1816 assert!(alerts.is_empty(), "No alerts expected for empty metrics");
1817 }
1818
1819 #[test]
1820 fn test_get_all_partition_stats() {
1821 let metrics = PartitionMetrics::new(4);
1822
1823 metrics.record_write(0, Duration::from_micros(100));
1824 metrics.record_write(2, Duration::from_micros(200));
1825
1826 let all_stats = metrics.get_all_partition_stats();
1827 assert_eq!(all_stats.len(), 4);
1828 assert_eq!(all_stats[0].event_count, 1);
1829 assert_eq!(all_stats[1].event_count, 0);
1830 assert_eq!(all_stats[2].event_count, 1);
1831 assert_eq!(all_stats[3].event_count, 0);
1832 }
1833
1834 #[test]
1835 fn test_prometheus_encoding() {
1836 let metrics = PartitionMetrics::new(4);
1837
1838 metrics.record_write(0, Duration::from_micros(100));
1839 metrics.record_write(1, Duration::from_micros(200));
1840 metrics.record_error(0);
1841
1842 let encoded = metrics.encode().unwrap();
1843
1844 assert!(encoded.contains("allsource_partition_events_total"));
1845 assert!(encoded.contains("allsource_partition_write_latency"));
1846 assert!(encoded.contains("allsource_partition_errors_total"));
1847 }
1848
1849 #[test]
1850 fn test_reset() {
1851 let metrics = PartitionMetrics::new(4);
1852
1853 metrics.record_write(0, Duration::from_micros(100));
1854 metrics.record_error(1);
1855
1856 assert_eq!(metrics.total_events(), 1);
1857 assert_eq!(metrics.total_errors(), 1);
1858
1859 metrics.reset();
1860
1861 assert_eq!(metrics.total_events(), 0);
1862 assert_eq!(metrics.total_errors(), 0);
1863 }
1864
1865 #[test]
1866 fn test_concurrent_writes() {
1867 let metrics = Arc::new(PartitionMetrics::new(32));
1868 let mut handles = vec![];
1869
1870 for _ in 0..8 {
1872 let metrics_clone = metrics.clone();
1873 let handle = thread::spawn(move || {
1874 for i in 0..1000 {
1875 let partition_id = (i % 32) as u32;
1876 metrics_clone.record_write(partition_id, Duration::from_micros(100));
1877 }
1878 });
1879 handles.push(handle);
1880 }
1881
1882 for handle in handles {
1883 handle.join().unwrap();
1884 }
1885
1886 assert_eq!(metrics.total_events(), 8000);
1887 }
1888
1889 #[test]
1890 fn test_get_distribution() {
1891 let metrics = PartitionMetrics::new(4);
1892
1893 metrics.record_write(0, Duration::from_micros(100));
1894 metrics.record_write(0, Duration::from_micros(100));
1895 metrics.record_write(2, Duration::from_micros(100));
1896
1897 let distribution = metrics.get_distribution();
1898
1899 assert_eq!(distribution.get(&0), Some(&2));
1900 assert_eq!(distribution.get(&1), Some(&0));
1901 assert_eq!(distribution.get(&2), Some(&1));
1902 assert_eq!(distribution.get(&3), Some(&0));
1903 }
1904
1905 #[test]
1906 fn test_partition_stats_avg_latency_none() {
1907 let stats = PartitionStats::new(0);
1908 assert!(stats.avg_latency().is_none());
1909 }
1910
1911 #[test]
1912 fn test_alert_message_format() {
1913 let metrics = PartitionMetrics::new(4);
1914
1915 for _ in 0..1000 {
1917 metrics.record_write(0, Duration::from_micros(100));
1918 }
1919 for i in 1..4 {
1920 for _ in 0..100 {
1921 metrics.record_write(i, Duration::from_micros(100));
1922 }
1923 }
1924
1925 let alerts = metrics.detect_partition_imbalance();
1926 assert!(!alerts.is_empty());
1927
1928 let alert = &alerts[0];
1929 assert!(alert.message.contains("Partition 0"));
1930 assert!(alert.message.contains("average load"));
1931 }
1932
1933 #[test]
1934 fn test_uptime() {
1935 let metrics = PartitionMetrics::new(4);
1936
1937 thread::sleep(Duration::from_millis(10));
1939
1940 let uptime = metrics.uptime();
1941 assert!(uptime.as_millis() >= 10);
1942 }
1943}