Skip to main content

allsource_core/infrastructure/observability/
metrics.rs

1use prometheus::{
2    Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
3    Registry,
4};
5use serde::Serialize;
6use std::{
7    collections::HashMap,
8    sync::{
9        Arc,
10        atomic::{AtomicU64, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15/// Centralized metrics registry for AllSource
16pub struct MetricsRegistry {
17    /// Prometheus registry
18    registry: Registry,
19
20    // Event ingestion metrics
21    pub events_ingested_total: IntCounter,
22    pub events_ingested_by_type: IntCounterVec,
23    pub ingestion_duration_seconds: Histogram,
24    pub ingestion_errors_total: IntCounter,
25
26    // Query metrics
27    pub queries_total: IntCounterVec,
28    pub query_duration_seconds: HistogramVec,
29    pub query_results_total: IntCounterVec,
30
31    // Storage metrics
32    pub storage_events_total: IntGauge,
33    pub storage_entities_total: IntGauge,
34    pub storage_size_bytes: IntGauge,
35    pub parquet_files_total: IntGauge,
36    pub wal_segments_total: IntGauge,
37
38    // Projection metrics
39    pub projections_total: IntGauge,
40    pub projection_events_processed: IntCounterVec,
41    pub projection_errors_total: IntCounterVec,
42    pub projection_processing_duration: HistogramVec,
43    pub projection_duration_seconds: Histogram,
44
45    // Schema registry metrics (v0.5)
46    pub schemas_registered_total: IntCounter,
47    pub schema_validations_total: IntCounterVec,
48    pub schema_validation_duration: Histogram,
49
50    // Replay metrics (v0.5)
51    pub replays_started_total: IntCounter,
52    pub replays_completed_total: IntCounter,
53    pub replays_failed_total: IntCounter,
54    pub replay_events_processed: IntCounter,
55    pub replay_duration_seconds: Histogram,
56
57    // Pipeline metrics (v0.5)
58    pub pipelines_registered_total: IntGauge,
59    pub pipeline_events_processed: IntCounterVec,
60    pub pipeline_events_filtered: IntCounterVec,
61    pub pipeline_errors_total: IntCounterVec,
62    pub pipeline_processing_duration: HistogramVec,
63    pub pipeline_duration_seconds: Histogram,
64
65    // Snapshot metrics
66    pub snapshots_created_total: IntCounter,
67    pub snapshot_creation_duration: Histogram,
68    pub snapshots_total: IntGauge,
69
70    // Compaction metrics
71    pub compactions_total: IntCounter,
72    pub compaction_duration_seconds: Histogram,
73    pub compaction_files_merged: IntCounter,
74    pub compaction_bytes_saved: IntCounter,
75
76    // WebSocket metrics
77    pub websocket_connections_active: IntGauge,
78    pub websocket_connections_total: IntCounter,
79    pub websocket_messages_sent: IntCounter,
80    pub websocket_errors_total: IntCounter,
81
82    // System metrics
83    pub http_requests_total: IntCounterVec,
84    pub http_request_duration_seconds: HistogramVec,
85    pub http_requests_in_flight: IntGauge,
86
87    // Replication metrics (leader)
88    pub replication_followers_connected: IntGauge,
89    pub replication_wal_shipped_total: IntCounter,
90    pub replication_wal_shipped_bytes_total: IntCounter,
91    pub replication_follower_lag_seconds: IntGaugeVec,
92    pub replication_acks_total: IntCounter,
93
94    // Replication ACK wait metric (semi-sync/sync mode)
95    pub replication_ack_wait_seconds: Histogram,
96
97    // Replication metrics (follower)
98    pub replication_wal_received_total: IntCounter,
99    pub replication_wal_replayed_total: IntCounter,
100    pub replication_lag_seconds: IntGauge,
101    pub replication_connected: IntGauge,
102    pub replication_reconnects_total: IntCounter,
103}
104
105impl MetricsRegistry {
106    pub fn new() -> Arc<Self> {
107        let registry = Registry::new();
108
109        // Event ingestion metrics
110        let events_ingested_total = IntCounter::with_opts(Opts::new(
111            "allsource_events_ingested_total",
112            "Total number of events ingested",
113        ))
114        .unwrap();
115
116        let events_ingested_by_type = IntCounterVec::new(
117            Opts::new(
118                "allsource_events_ingested_by_type",
119                "Events ingested by type",
120            ),
121            &["event_type"],
122        )
123        .unwrap();
124
125        let ingestion_duration_seconds = Histogram::with_opts(HistogramOpts::new(
126            "allsource_ingestion_duration_seconds",
127            "Event ingestion duration in seconds",
128        ))
129        .unwrap();
130
131        let ingestion_errors_total = IntCounter::with_opts(Opts::new(
132            "allsource_ingestion_errors_total",
133            "Total number of ingestion errors",
134        ))
135        .unwrap();
136
137        // Query metrics
138        let queries_total = IntCounterVec::new(
139            Opts::new("allsource_queries_total", "Total number of queries"),
140            &["query_type"],
141        )
142        .unwrap();
143
144        let query_duration_seconds = HistogramVec::new(
145            HistogramOpts::new(
146                "allsource_query_duration_seconds",
147                "Query duration in seconds",
148            ),
149            &["query_type"],
150        )
151        .unwrap();
152
153        let query_results_total = IntCounterVec::new(
154            Opts::new(
155                "allsource_query_results_total",
156                "Total number of events returned by queries",
157            ),
158            &["query_type"],
159        )
160        .unwrap();
161
162        // Storage metrics
163        let storage_events_total = IntGauge::with_opts(Opts::new(
164            "allsource_storage_events_total",
165            "Total number of events in storage",
166        ))
167        .unwrap();
168
169        let storage_entities_total = IntGauge::with_opts(Opts::new(
170            "allsource_storage_entities_total",
171            "Total number of entities in storage",
172        ))
173        .unwrap();
174
175        let storage_size_bytes = IntGauge::with_opts(Opts::new(
176            "allsource_storage_size_bytes",
177            "Total storage size in bytes",
178        ))
179        .unwrap();
180
181        let parquet_files_total = IntGauge::with_opts(Opts::new(
182            "allsource_parquet_files_total",
183            "Number of Parquet files",
184        ))
185        .unwrap();
186
187        let wal_segments_total = IntGauge::with_opts(Opts::new(
188            "allsource_wal_segments_total",
189            "Number of WAL segments",
190        ))
191        .unwrap();
192
193        // Projection metrics
194        let projection_events_processed = IntCounterVec::new(
195            Opts::new(
196                "allsource_projection_events_processed",
197                "Events processed by projections",
198            ),
199            &["projection_name"],
200        )
201        .unwrap();
202
203        let projection_errors_total = IntCounterVec::new(
204            Opts::new(
205                "allsource_projection_errors_total",
206                "Total projection errors",
207            ),
208            &["projection_name"],
209        )
210        .unwrap();
211
212        let projection_processing_duration = HistogramVec::new(
213            HistogramOpts::new(
214                "allsource_projection_processing_duration_seconds",
215                "Projection processing duration",
216            ),
217            &["projection_name"],
218        )
219        .unwrap();
220
221        let projections_total = IntGauge::with_opts(Opts::new(
222            "allsource_projections_total",
223            "Number of registered projections",
224        ))
225        .unwrap();
226
227        let projection_duration_seconds = Histogram::with_opts(HistogramOpts::new(
228            "allsource_projection_duration_seconds",
229            "Overall projection manager processing duration",
230        ))
231        .unwrap();
232
233        // Schema registry metrics (v0.5)
234        let schemas_registered_total = IntCounter::with_opts(Opts::new(
235            "allsource_schemas_registered_total",
236            "Total number of schemas registered",
237        ))
238        .unwrap();
239
240        let schema_validations_total = IntCounterVec::new(
241            Opts::new(
242                "allsource_schema_validations_total",
243                "Schema validations by result",
244            ),
245            &["subject", "result"],
246        )
247        .unwrap();
248
249        let schema_validation_duration = Histogram::with_opts(HistogramOpts::new(
250            "allsource_schema_validation_duration_seconds",
251            "Schema validation duration",
252        ))
253        .unwrap();
254
255        // Replay metrics (v0.5)
256        let replays_started_total = IntCounter::with_opts(Opts::new(
257            "allsource_replays_started_total",
258            "Total replays started",
259        ))
260        .unwrap();
261
262        let replays_completed_total = IntCounter::with_opts(Opts::new(
263            "allsource_replays_completed_total",
264            "Total replays completed",
265        ))
266        .unwrap();
267
268        let replays_failed_total = IntCounter::with_opts(Opts::new(
269            "allsource_replays_failed_total",
270            "Total replays failed",
271        ))
272        .unwrap();
273
274        let replay_events_processed = IntCounter::with_opts(Opts::new(
275            "allsource_replay_events_processed",
276            "Events processed during replays",
277        ))
278        .unwrap();
279
280        let replay_duration_seconds = Histogram::with_opts(HistogramOpts::new(
281            "allsource_replay_duration_seconds",
282            "Replay duration",
283        ))
284        .unwrap();
285
286        // Pipeline metrics (v0.5)
287        let pipelines_registered_total = IntGauge::with_opts(Opts::new(
288            "allsource_pipelines_registered_total",
289            "Number of registered pipelines",
290        ))
291        .unwrap();
292
293        let pipeline_events_processed = IntCounterVec::new(
294            Opts::new(
295                "allsource_pipeline_events_processed",
296                "Events processed by pipelines",
297            ),
298            &["pipeline_id", "pipeline_name"],
299        )
300        .unwrap();
301
302        let pipeline_events_filtered = IntCounterVec::new(
303            Opts::new(
304                "allsource_pipeline_events_filtered",
305                "Events filtered by pipelines",
306            ),
307            &["pipeline_id", "pipeline_name"],
308        )
309        .unwrap();
310
311        let pipeline_processing_duration = HistogramVec::new(
312            HistogramOpts::new(
313                "allsource_pipeline_processing_duration_seconds",
314                "Pipeline processing duration",
315            ),
316            &["pipeline_id", "pipeline_name"],
317        )
318        .unwrap();
319
320        let pipeline_errors_total = IntCounterVec::new(
321            Opts::new("allsource_pipeline_errors_total", "Total pipeline errors"),
322            &["pipeline_name"],
323        )
324        .unwrap();
325
326        let pipeline_duration_seconds = Histogram::with_opts(HistogramOpts::new(
327            "allsource_pipeline_duration_seconds",
328            "Overall pipeline manager processing duration",
329        ))
330        .unwrap();
331
332        // Snapshot metrics
333        let snapshots_created_total = IntCounter::with_opts(Opts::new(
334            "allsource_snapshots_created_total",
335            "Total snapshots created",
336        ))
337        .unwrap();
338
339        let snapshot_creation_duration = Histogram::with_opts(HistogramOpts::new(
340            "allsource_snapshot_creation_duration_seconds",
341            "Snapshot creation duration",
342        ))
343        .unwrap();
344
345        let snapshots_total = IntGauge::with_opts(Opts::new(
346            "allsource_snapshots_total",
347            "Total number of snapshots",
348        ))
349        .unwrap();
350
351        // Compaction metrics
352        let compactions_total = IntCounter::with_opts(Opts::new(
353            "allsource_compactions_total",
354            "Total compactions performed",
355        ))
356        .unwrap();
357
358        let compaction_duration_seconds = Histogram::with_opts(HistogramOpts::new(
359            "allsource_compaction_duration_seconds",
360            "Compaction duration",
361        ))
362        .unwrap();
363
364        let compaction_files_merged = IntCounter::with_opts(Opts::new(
365            "allsource_compaction_files_merged",
366            "Files merged during compaction",
367        ))
368        .unwrap();
369
370        let compaction_bytes_saved = IntCounter::with_opts(Opts::new(
371            "allsource_compaction_bytes_saved",
372            "Bytes saved by compaction",
373        ))
374        .unwrap();
375
376        // WebSocket metrics
377        let websocket_connections_active = IntGauge::with_opts(Opts::new(
378            "allsource_websocket_connections_active",
379            "Active WebSocket connections",
380        ))
381        .unwrap();
382
383        let websocket_connections_total = IntCounter::with_opts(Opts::new(
384            "allsource_websocket_connections_total",
385            "Total WebSocket connections",
386        ))
387        .unwrap();
388
389        let websocket_messages_sent = IntCounter::with_opts(Opts::new(
390            "allsource_websocket_messages_sent",
391            "WebSocket messages sent",
392        ))
393        .unwrap();
394
395        let websocket_errors_total = IntCounter::with_opts(Opts::new(
396            "allsource_websocket_errors_total",
397            "WebSocket errors",
398        ))
399        .unwrap();
400
401        // System metrics
402        let http_requests_total = IntCounterVec::new(
403            Opts::new("allsource_http_requests_total", "Total HTTP requests"),
404            &["method", "endpoint", "status"],
405        )
406        .unwrap();
407
408        let http_request_duration_seconds = HistogramVec::new(
409            HistogramOpts::new(
410                "allsource_http_request_duration_seconds",
411                "HTTP request duration",
412            ),
413            &["method", "endpoint"],
414        )
415        .unwrap();
416
417        let http_requests_in_flight = IntGauge::with_opts(Opts::new(
418            "allsource_http_requests_in_flight",
419            "HTTP requests currently being processed",
420        ))
421        .unwrap();
422
423        // Replication metrics (leader)
424        let replication_followers_connected = IntGauge::with_opts(Opts::new(
425            "allsource_replication_followers_connected",
426            "Number of connected followers",
427        ))
428        .unwrap();
429
430        let replication_wal_shipped_total = IntCounter::with_opts(Opts::new(
431            "allsource_replication_wal_shipped_total",
432            "Total WAL entries shipped to followers",
433        ))
434        .unwrap();
435
436        let replication_wal_shipped_bytes_total = IntCounter::with_opts(Opts::new(
437            "allsource_replication_wal_shipped_bytes_total",
438            "Total bytes shipped to followers",
439        ))
440        .unwrap();
441
442        let replication_follower_lag_seconds = IntGaugeVec::new(
443            Opts::new(
444                "allsource_replication_follower_lag_seconds",
445                "Per-follower replication lag in seconds",
446            ),
447            &["follower_id"],
448        )
449        .unwrap();
450
451        let replication_acks_total = IntCounter::with_opts(Opts::new(
452            "allsource_replication_acks_total",
453            "Total ACKs received from followers",
454        ))
455        .unwrap();
456
457        let replication_ack_wait_seconds = Histogram::with_opts(
458            HistogramOpts::new(
459                "allsource_replication_ack_wait_seconds",
460                "Time spent waiting for follower ACKs in semi-sync/sync mode",
461            )
462            .buckets(vec![
463                0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
464            ]),
465        )
466        .unwrap();
467
468        // Replication metrics (follower)
469        let replication_wal_received_total = IntCounter::with_opts(Opts::new(
470            "allsource_replication_wal_received_total",
471            "Total WAL entries received from leader",
472        ))
473        .unwrap();
474
475        let replication_wal_replayed_total = IntCounter::with_opts(Opts::new(
476            "allsource_replication_wal_replayed_total",
477            "Total WAL entries replayed into DashMap",
478        ))
479        .unwrap();
480
481        let replication_lag_seconds = IntGauge::with_opts(Opts::new(
482            "allsource_replication_lag_seconds",
483            "Replication lag behind leader in seconds",
484        ))
485        .unwrap();
486
487        let replication_connected = IntGauge::with_opts(Opts::new(
488            "allsource_replication_connected",
489            "Whether connected to leader (1=connected, 0=disconnected)",
490        ))
491        .unwrap();
492
493        let replication_reconnects_total = IntCounter::with_opts(Opts::new(
494            "allsource_replication_reconnects_total",
495            "Total reconnection attempts to leader",
496        ))
497        .unwrap();
498
499        // Register all metrics
500        registry
501            .register(Box::new(events_ingested_total.clone()))
502            .unwrap();
503        registry
504            .register(Box::new(events_ingested_by_type.clone()))
505            .unwrap();
506        registry
507            .register(Box::new(ingestion_duration_seconds.clone()))
508            .unwrap();
509        registry
510            .register(Box::new(ingestion_errors_total.clone()))
511            .unwrap();
512
513        registry.register(Box::new(queries_total.clone())).unwrap();
514        registry
515            .register(Box::new(query_duration_seconds.clone()))
516            .unwrap();
517        registry
518            .register(Box::new(query_results_total.clone()))
519            .unwrap();
520
521        registry
522            .register(Box::new(storage_events_total.clone()))
523            .unwrap();
524        registry
525            .register(Box::new(storage_entities_total.clone()))
526            .unwrap();
527        registry
528            .register(Box::new(storage_size_bytes.clone()))
529            .unwrap();
530        registry
531            .register(Box::new(parquet_files_total.clone()))
532            .unwrap();
533        registry
534            .register(Box::new(wal_segments_total.clone()))
535            .unwrap();
536
537        registry
538            .register(Box::new(projection_events_processed.clone()))
539            .unwrap();
540        registry
541            .register(Box::new(projection_errors_total.clone()))
542            .unwrap();
543        registry
544            .register(Box::new(projection_processing_duration.clone()))
545            .unwrap();
546        registry
547            .register(Box::new(projections_total.clone()))
548            .unwrap();
549        registry
550            .register(Box::new(projection_duration_seconds.clone()))
551            .unwrap();
552
553        registry
554            .register(Box::new(schemas_registered_total.clone()))
555            .unwrap();
556        registry
557            .register(Box::new(schema_validations_total.clone()))
558            .unwrap();
559        registry
560            .register(Box::new(schema_validation_duration.clone()))
561            .unwrap();
562
563        registry
564            .register(Box::new(replays_started_total.clone()))
565            .unwrap();
566        registry
567            .register(Box::new(replays_completed_total.clone()))
568            .unwrap();
569        registry
570            .register(Box::new(replays_failed_total.clone()))
571            .unwrap();
572        registry
573            .register(Box::new(replay_events_processed.clone()))
574            .unwrap();
575        registry
576            .register(Box::new(replay_duration_seconds.clone()))
577            .unwrap();
578
579        registry
580            .register(Box::new(pipelines_registered_total.clone()))
581            .unwrap();
582        registry
583            .register(Box::new(pipeline_events_processed.clone()))
584            .unwrap();
585        registry
586            .register(Box::new(pipeline_events_filtered.clone()))
587            .unwrap();
588        registry
589            .register(Box::new(pipeline_processing_duration.clone()))
590            .unwrap();
591        registry
592            .register(Box::new(pipeline_errors_total.clone()))
593            .unwrap();
594        registry
595            .register(Box::new(pipeline_duration_seconds.clone()))
596            .unwrap();
597
598        registry
599            .register(Box::new(snapshots_created_total.clone()))
600            .unwrap();
601        registry
602            .register(Box::new(snapshot_creation_duration.clone()))
603            .unwrap();
604        registry
605            .register(Box::new(snapshots_total.clone()))
606            .unwrap();
607
608        registry
609            .register(Box::new(compactions_total.clone()))
610            .unwrap();
611        registry
612            .register(Box::new(compaction_duration_seconds.clone()))
613            .unwrap();
614        registry
615            .register(Box::new(compaction_files_merged.clone()))
616            .unwrap();
617        registry
618            .register(Box::new(compaction_bytes_saved.clone()))
619            .unwrap();
620
621        registry
622            .register(Box::new(websocket_connections_active.clone()))
623            .unwrap();
624        registry
625            .register(Box::new(websocket_connections_total.clone()))
626            .unwrap();
627        registry
628            .register(Box::new(websocket_messages_sent.clone()))
629            .unwrap();
630        registry
631            .register(Box::new(websocket_errors_total.clone()))
632            .unwrap();
633
634        registry
635            .register(Box::new(http_requests_total.clone()))
636            .unwrap();
637        registry
638            .register(Box::new(http_request_duration_seconds.clone()))
639            .unwrap();
640        registry
641            .register(Box::new(http_requests_in_flight.clone()))
642            .unwrap();
643
644        registry
645            .register(Box::new(replication_followers_connected.clone()))
646            .unwrap();
647        registry
648            .register(Box::new(replication_wal_shipped_total.clone()))
649            .unwrap();
650        registry
651            .register(Box::new(replication_wal_shipped_bytes_total.clone()))
652            .unwrap();
653        registry
654            .register(Box::new(replication_follower_lag_seconds.clone()))
655            .unwrap();
656        registry
657            .register(Box::new(replication_acks_total.clone()))
658            .unwrap();
659        registry
660            .register(Box::new(replication_ack_wait_seconds.clone()))
661            .unwrap();
662        registry
663            .register(Box::new(replication_wal_received_total.clone()))
664            .unwrap();
665        registry
666            .register(Box::new(replication_wal_replayed_total.clone()))
667            .unwrap();
668        registry
669            .register(Box::new(replication_lag_seconds.clone()))
670            .unwrap();
671        registry
672            .register(Box::new(replication_connected.clone()))
673            .unwrap();
674        registry
675            .register(Box::new(replication_reconnects_total.clone()))
676            .unwrap();
677
678        Arc::new(Self {
679            registry,
680            events_ingested_total,
681            events_ingested_by_type,
682            ingestion_duration_seconds,
683            ingestion_errors_total,
684            queries_total,
685            query_duration_seconds,
686            query_results_total,
687            storage_events_total,
688            storage_entities_total,
689            storage_size_bytes,
690            parquet_files_total,
691            wal_segments_total,
692            projection_events_processed,
693            projection_errors_total,
694            projection_processing_duration,
695            projections_total,
696            projection_duration_seconds,
697            schemas_registered_total,
698            schema_validations_total,
699            schema_validation_duration,
700            replays_started_total,
701            replays_completed_total,
702            replays_failed_total,
703            replay_events_processed,
704            replay_duration_seconds,
705            pipelines_registered_total,
706            pipeline_events_processed,
707            pipeline_events_filtered,
708            pipeline_processing_duration,
709            pipeline_errors_total,
710            pipeline_duration_seconds,
711            snapshots_created_total,
712            snapshot_creation_duration,
713            snapshots_total,
714            compactions_total,
715            compaction_duration_seconds,
716            compaction_files_merged,
717            compaction_bytes_saved,
718            websocket_connections_active,
719            websocket_connections_total,
720            websocket_messages_sent,
721            websocket_errors_total,
722            http_requests_total,
723            http_request_duration_seconds,
724            http_requests_in_flight,
725            replication_followers_connected,
726            replication_wal_shipped_total,
727            replication_wal_shipped_bytes_total,
728            replication_follower_lag_seconds,
729            replication_acks_total,
730            replication_ack_wait_seconds,
731            replication_wal_received_total,
732            replication_wal_replayed_total,
733            replication_lag_seconds,
734            replication_connected,
735            replication_reconnects_total,
736        })
737    }
738
739    /// Get the Prometheus registry
740    pub fn registry(&self) -> &Registry {
741        &self.registry
742    }
743
744    /// Encode metrics in Prometheus text format
745    pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
746        use prometheus::Encoder;
747        let encoder = prometheus::TextEncoder::new();
748        let metric_families = self.registry.gather();
749        let mut buffer = Vec::new();
750        encoder.encode(&metric_families, &mut buffer)?;
751        Ok(String::from_utf8(buffer)?)
752    }
753}
754
755// Note: Clone and Default are intentionally NOT implemented for MetricsRegistry.
756// Use Arc<MetricsRegistry> to share the same registry across the application.
757// Creating multiple registries would result in duplicate metrics which is incorrect.
758
759#[cfg(test)]
760mod tests {
761    use super::*;
762
763    #[test]
764    fn test_metrics_registry_creation() {
765        let metrics = MetricsRegistry::new();
766        assert_eq!(metrics.events_ingested_total.get(), 0);
767        assert_eq!(metrics.storage_events_total.get(), 0);
768    }
769
770    #[test]
771    fn test_event_ingestion_metrics() {
772        let metrics = MetricsRegistry::new();
773
774        // Increment ingestion counter
775        metrics.events_ingested_total.inc();
776        assert_eq!(metrics.events_ingested_total.get(), 1);
777
778        // Increment by type
779        metrics
780            .events_ingested_by_type
781            .with_label_values(&["user.created"])
782            .inc();
783        assert_eq!(
784            metrics
785                .events_ingested_by_type
786                .with_label_values(&["user.created"])
787                .get(),
788            1
789        );
790
791        // Record duration
792        metrics.ingestion_duration_seconds.observe(0.1);
793    }
794
795    #[test]
796    fn test_query_metrics() {
797        let metrics = MetricsRegistry::new();
798
799        // Increment query counter
800        metrics
801            .queries_total
802            .with_label_values(&["entity_id"])
803            .inc();
804        assert_eq!(
805            metrics
806                .queries_total
807                .with_label_values(&["entity_id"])
808                .get(),
809            1
810        );
811
812        // Record query duration
813        metrics
814            .query_duration_seconds
815            .with_label_values(&["entity_id"])
816            .observe(0.05);
817
818        // Record query results
819        metrics
820            .query_results_total
821            .with_label_values(&["entity_id"])
822            .inc_by(10);
823    }
824
825    #[test]
826    fn test_storage_metrics() {
827        let metrics = MetricsRegistry::new();
828
829        // Set storage metrics
830        metrics.storage_events_total.set(1000);
831        assert_eq!(metrics.storage_events_total.get(), 1000);
832
833        metrics.storage_entities_total.set(50);
834        assert_eq!(metrics.storage_entities_total.get(), 50);
835
836        metrics.storage_size_bytes.set(1024 * 1024);
837        assert_eq!(metrics.storage_size_bytes.get(), 1024 * 1024);
838
839        metrics.parquet_files_total.set(5);
840        metrics.wal_segments_total.set(3);
841    }
842
843    #[test]
844    fn test_projection_metrics() {
845        let metrics = MetricsRegistry::new();
846
847        // Set projections total
848        metrics.projections_total.set(3);
849        assert_eq!(metrics.projections_total.get(), 3);
850
851        // Process events in projection
852        metrics
853            .projection_events_processed
854            .with_label_values(&["user_snapshot"])
855            .inc_by(100);
856
857        // Record processing duration
858        metrics
859            .projection_processing_duration
860            .with_label_values(&["user_snapshot"])
861            .observe(0.2);
862
863        // Record errors
864        metrics
865            .projection_errors_total
866            .with_label_values(&["user_snapshot"])
867            .inc();
868    }
869
870    #[test]
871    fn test_schema_metrics() {
872        let metrics = MetricsRegistry::new();
873
874        // Register schema
875        metrics.schemas_registered_total.inc();
876        assert_eq!(metrics.schemas_registered_total.get(), 1);
877
878        // Validation success - requires both subject and result labels
879        metrics
880            .schema_validations_total
881            .with_label_values(&["user.schema", "success"])
882            .inc();
883
884        // Validation failure
885        metrics
886            .schema_validations_total
887            .with_label_values(&["order.schema", "failure"])
888            .inc();
889
890        // Record validation duration
891        metrics.schema_validation_duration.observe(0.01);
892    }
893
894    #[test]
895    fn test_replay_metrics() {
896        let metrics = MetricsRegistry::new();
897
898        // Start replay
899        metrics.replays_started_total.inc();
900        assert_eq!(metrics.replays_started_total.get(), 1);
901
902        // Process events
903        metrics.replay_events_processed.inc_by(500);
904        assert_eq!(metrics.replay_events_processed.get(), 500);
905
906        // Complete replay
907        metrics.replays_completed_total.inc();
908        assert_eq!(metrics.replays_completed_total.get(), 1);
909
910        // Record duration
911        metrics.replay_duration_seconds.observe(5.5);
912    }
913
914    #[test]
915    fn test_pipeline_metrics() {
916        let metrics = MetricsRegistry::new();
917
918        // Register pipeline
919        metrics.pipelines_registered_total.set(2);
920        assert_eq!(metrics.pipelines_registered_total.get(), 2);
921
922        // Process events - requires both pipeline_id and pipeline_name labels
923        metrics
924            .pipeline_events_processed
925            .with_label_values(&["pipeline-1", "filter_pipeline"])
926            .inc_by(250);
927
928        // Record errors - only requires pipeline_name
929        metrics
930            .pipeline_errors_total
931            .with_label_values(&["filter_pipeline"])
932            .inc();
933
934        // Record duration - requires both pipeline_id and pipeline_name labels
935        metrics
936            .pipeline_processing_duration
937            .with_label_values(&["pipeline-1", "filter_pipeline"])
938            .observe(0.15);
939    }
940
941    #[test]
942    fn test_metrics_encode() {
943        let metrics = MetricsRegistry::new();
944
945        // Add some data
946        metrics.events_ingested_total.inc_by(100);
947        metrics.storage_events_total.set(1000);
948
949        // Encode to Prometheus format
950        let encoded = metrics.encode().unwrap();
951
952        // Verify output contains metrics
953        assert!(encoded.contains("events_ingested_total"));
954        assert!(encoded.contains("storage_events_total"));
955    }
956
957    #[test]
958    fn test_metrics_default() {
959        let metrics = MetricsRegistry::new();
960        assert_eq!(metrics.events_ingested_total.get(), 0);
961    }
962
963    #[test]
964    fn test_websocket_metrics() {
965        let metrics = MetricsRegistry::new();
966
967        // Connect client
968        metrics.websocket_connections_active.inc();
969        assert_eq!(metrics.websocket_connections_active.get(), 1);
970
971        // Total connections
972        metrics.websocket_connections_total.inc();
973
974        // Broadcast message
975        metrics.websocket_messages_sent.inc_by(10);
976        assert_eq!(metrics.websocket_messages_sent.get(), 10);
977
978        // Disconnect client
979        metrics.websocket_connections_active.dec();
980        assert_eq!(metrics.websocket_connections_active.get(), 0);
981
982        // Record error
983        metrics.websocket_errors_total.inc();
984    }
985
986    #[test]
987    fn test_compaction_metrics() {
988        let metrics = MetricsRegistry::new();
989
990        // Start compaction
991        metrics.compactions_total.inc();
992        assert_eq!(metrics.compactions_total.get(), 1);
993
994        // Record duration
995        metrics.compaction_duration_seconds.observe(5.2);
996
997        // Files merged
998        metrics.compaction_files_merged.inc_by(5);
999
1000        // Bytes saved
1001        metrics.compaction_bytes_saved.inc_by(1024 * 1024);
1002    }
1003
1004    #[test]
1005    fn test_snapshot_metrics() {
1006        let metrics = MetricsRegistry::new();
1007
1008        // Create snapshot
1009        metrics.snapshots_created_total.inc();
1010        assert_eq!(metrics.snapshots_created_total.get(), 1);
1011
1012        // Record duration
1013        metrics.snapshot_creation_duration.observe(0.5);
1014
1015        // Total snapshots
1016        metrics.snapshots_total.set(10);
1017        assert_eq!(metrics.snapshots_total.get(), 10);
1018    }
1019
1020    #[test]
1021    fn test_replication_leader_metrics() {
1022        let metrics = MetricsRegistry::new();
1023
1024        // Follower connected
1025        metrics.replication_followers_connected.set(2);
1026        assert_eq!(metrics.replication_followers_connected.get(), 2);
1027
1028        // WAL entries shipped
1029        metrics.replication_wal_shipped_total.inc_by(100);
1030        assert_eq!(metrics.replication_wal_shipped_total.get(), 100);
1031
1032        // Bytes shipped
1033        metrics
1034            .replication_wal_shipped_bytes_total
1035            .inc_by(1024 * 1024);
1036        assert_eq!(
1037            metrics.replication_wal_shipped_bytes_total.get(),
1038            1024 * 1024
1039        );
1040
1041        // Per-follower lag
1042        metrics
1043            .replication_follower_lag_seconds
1044            .with_label_values(&["follower-1"])
1045            .set(5);
1046        assert_eq!(
1047            metrics
1048                .replication_follower_lag_seconds
1049                .with_label_values(&["follower-1"])
1050                .get(),
1051            5
1052        );
1053
1054        // ACKs received
1055        metrics.replication_acks_total.inc_by(50);
1056        assert_eq!(metrics.replication_acks_total.get(), 50);
1057    }
1058
1059    #[test]
1060    fn test_replication_follower_metrics() {
1061        let metrics = MetricsRegistry::new();
1062
1063        // WAL entries received
1064        metrics.replication_wal_received_total.inc_by(200);
1065        assert_eq!(metrics.replication_wal_received_total.get(), 200);
1066
1067        // WAL entries replayed
1068        metrics.replication_wal_replayed_total.inc_by(195);
1069        assert_eq!(metrics.replication_wal_replayed_total.get(), 195);
1070
1071        // Lag behind leader
1072        metrics.replication_lag_seconds.set(3);
1073        assert_eq!(metrics.replication_lag_seconds.get(), 3);
1074
1075        // Connected state
1076        metrics.replication_connected.set(1);
1077        assert_eq!(metrics.replication_connected.get(), 1);
1078
1079        // Reconnects
1080        metrics.replication_reconnects_total.inc_by(2);
1081        assert_eq!(metrics.replication_reconnects_total.get(), 2);
1082    }
1083
1084    #[test]
1085    fn test_replication_metrics_in_encode() {
1086        let metrics = MetricsRegistry::new();
1087
1088        metrics.replication_followers_connected.set(1);
1089        metrics.replication_wal_shipped_total.inc();
1090        metrics.replication_connected.set(1);
1091
1092        let encoded = metrics.encode().unwrap();
1093        assert!(encoded.contains("allsource_replication_followers_connected"));
1094        assert!(encoded.contains("allsource_replication_wal_shipped_total"));
1095        assert!(encoded.contains("allsource_replication_connected"));
1096    }
1097
1098    #[test]
1099    fn test_http_metrics() {
1100        let metrics = MetricsRegistry::new();
1101
1102        // Record request
1103        metrics
1104            .http_requests_total
1105            .with_label_values(&["GET", "/api/events", "200"])
1106            .inc();
1107
1108        // Record duration
1109        metrics
1110            .http_request_duration_seconds
1111            .with_label_values(&["GET", "/api/events"])
1112            .observe(0.025);
1113
1114        // In-flight requests
1115        metrics.http_requests_in_flight.inc();
1116        assert_eq!(metrics.http_requests_in_flight.get(), 1);
1117
1118        metrics.http_requests_in_flight.dec();
1119        assert_eq!(metrics.http_requests_in_flight.get(), 0);
1120    }
1121}
1122
1123// =============================================================================
1124// Partition Metrics (SierraDB pattern)
1125// =============================================================================
1126
1127/// Per-partition statistics for detecting hot partitions and skew
1128///
1129/// SierraDB uses 32 fixed partitions for single-node, 1024+ for clusters.
1130/// This struct tracks metrics per partition to detect imbalances.
1131#[derive(Debug)]
1132pub struct PartitionStats {
1133    /// Partition ID (0 to partition_count-1)
1134    pub partition_id: u32,
1135
1136    /// Total events written to this partition
1137    pub event_count: u64,
1138
1139    /// Total write latency sum (nanoseconds) for calculating average
1140    pub total_latency_ns: u64,
1141
1142    /// Number of writes for calculating average latency
1143    pub write_count: u64,
1144
1145    /// Minimum write latency (nanoseconds)
1146    pub min_latency_ns: u64,
1147
1148    /// Maximum write latency (nanoseconds)
1149    pub max_latency_ns: u64,
1150
1151    /// Total error count for this partition
1152    pub error_count: u64,
1153}
1154
1155impl PartitionStats {
1156    fn new(partition_id: u32) -> Self {
1157        Self {
1158            partition_id,
1159            event_count: 0,
1160            total_latency_ns: 0,
1161            write_count: 0,
1162            min_latency_ns: u64::MAX,
1163            max_latency_ns: 0,
1164            error_count: 0,
1165        }
1166    }
1167
1168    /// Calculate average write latency
1169    pub fn avg_latency(&self) -> Option<Duration> {
1170        self.total_latency_ns
1171            .checked_div(self.write_count)
1172            .map(Duration::from_nanos)
1173    }
1174}
1175
1176/// Alert generated when partition imbalance is detected
1177#[derive(Debug, Clone, Serialize)]
1178pub struct PartitionImbalanceAlert {
1179    /// Partition ID that is imbalanced
1180    pub partition_id: u32,
1181
1182    /// Event count for this partition
1183    pub event_count: u64,
1184
1185    /// Average event count across all partitions
1186    pub average_count: f64,
1187
1188    /// Ratio compared to average (>2.0 triggers alert)
1189    pub ratio_to_average: f64,
1190
1191    /// Alert message
1192    pub message: String,
1193
1194    /// Timestamp when alert was generated
1195    pub timestamp: chrono::DateTime<chrono::Utc>,
1196}
1197
1198/// Internal structure for tracking partition metrics atomically
1199struct PartitionMetricsEntry {
1200    event_count: AtomicU64,
1201    total_latency_ns: AtomicU64,
1202    write_count: AtomicU64,
1203    min_latency_ns: AtomicU64,
1204    max_latency_ns: AtomicU64,
1205    error_count: AtomicU64,
1206}
1207
1208impl PartitionMetricsEntry {
1209    fn new() -> Self {
1210        Self {
1211            event_count: AtomicU64::new(0),
1212            total_latency_ns: AtomicU64::new(0),
1213            write_count: AtomicU64::new(0),
1214            min_latency_ns: AtomicU64::new(u64::MAX),
1215            max_latency_ns: AtomicU64::new(0),
1216            error_count: AtomicU64::new(0),
1217        }
1218    }
1219}
1220
1221/// Partition monitoring for detecting hot partitions and skew (SierraDB pattern)
1222///
1223/// # Design Pattern
1224/// Uses atomic operations for lock-free metric updates per partition.
1225/// Tracks event counts, write latencies, and error rates per partition.
1226///
1227/// # SierraDB Context
1228/// SierraDB uses fixed partitions (32 for single-node, 1024+ for clusters).
1229/// Detecting hot partitions is critical for:
1230/// - Load balancing decisions
1231/// - Identifying skewed hash functions
1232/// - Capacity planning
1233/// - Performance troubleshooting
1234///
1235/// # Imbalance Detection
1236/// A partition is considered imbalanced if it has >2x the average load.
1237/// This threshold is based on SierraDB's experience with production workloads.
1238///
1239/// # Example
1240/// ```ignore
1241/// let partition_metrics = PartitionMetrics::new(32);
1242///
1243/// // Record write to partition 5
1244/// let start = Instant::now();
1245/// // ... write operation ...
1246/// partition_metrics.record_write(5, start.elapsed());
1247///
1248/// // Check for imbalances
1249/// let alerts = partition_metrics.detect_partition_imbalance();
1250/// for alert in alerts {
1251///     tracing::warn!("Partition imbalance: {}", alert.message);
1252/// }
1253/// ```
1254pub struct PartitionMetrics {
1255    /// Number of partitions
1256    partition_count: u32,
1257
1258    /// Per-partition metrics
1259    partitions: Vec<PartitionMetricsEntry>,
1260
1261    /// Prometheus metrics for per-partition event counts
1262    partition_events_total: IntGaugeVec,
1263
1264    /// Prometheus metrics for per-partition write latency histogram
1265    partition_write_latency: HistogramVec,
1266
1267    /// Prometheus metrics for per-partition error counts
1268    partition_errors_total: IntCounterVec,
1269
1270    /// Prometheus registry (for registration)
1271    registry: Registry,
1272
1273    /// Timestamp when metrics collection started
1274    started_at: Instant,
1275}
1276
1277impl PartitionMetrics {
1278    /// Create a new partition metrics tracker
1279    ///
1280    /// # Arguments
1281    /// * `partition_count` - Number of partitions (default: 32 for single-node)
1282    pub fn new(partition_count: u32) -> Self {
1283        let registry = Registry::new();
1284
1285        // Per-partition event count gauge
1286        let partition_events_total = IntGaugeVec::new(
1287            Opts::new(
1288                "allsource_partition_events_total",
1289                "Total events per partition",
1290            ),
1291            &["partition_id"],
1292        )
1293        .expect("Failed to create partition_events_total metric");
1294
1295        // Per-partition write latency histogram
1296        let partition_write_latency = HistogramVec::new(
1297            HistogramOpts::new(
1298                "allsource_partition_write_latency_seconds",
1299                "Write latency per partition in seconds",
1300            )
1301            .buckets(vec![
1302                0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
1303            ]),
1304            &["partition_id"],
1305        )
1306        .expect("Failed to create partition_write_latency metric");
1307
1308        // Per-partition error counter
1309        let partition_errors_total = IntCounterVec::new(
1310            Opts::new(
1311                "allsource_partition_errors_total",
1312                "Total errors per partition",
1313            ),
1314            &["partition_id"],
1315        )
1316        .expect("Failed to create partition_errors_total metric");
1317
1318        // Register metrics
1319        registry
1320            .register(Box::new(partition_events_total.clone()))
1321            .expect("Failed to register partition_events_total");
1322        registry
1323            .register(Box::new(partition_write_latency.clone()))
1324            .expect("Failed to register partition_write_latency");
1325        registry
1326            .register(Box::new(partition_errors_total.clone()))
1327            .expect("Failed to register partition_errors_total");
1328
1329        // Initialize per-partition atomic counters
1330        let partitions = (0..partition_count)
1331            .map(|_| PartitionMetricsEntry::new())
1332            .collect();
1333
1334        Self {
1335            partition_count,
1336            partitions,
1337            partition_events_total,
1338            partition_write_latency,
1339            partition_errors_total,
1340            registry,
1341            started_at: Instant::now(),
1342        }
1343    }
1344
1345    /// Create partition metrics with default partition count (32)
1346    pub fn with_default_partitions() -> Self {
1347        Self::new(32)
1348    }
1349
1350    /// Record a successful write to a partition
1351    ///
1352    /// # Arguments
1353    /// * `partition_id` - The partition ID (0 to partition_count-1)
1354    /// * `latency` - The write latency
1355    #[inline]
1356    pub fn record_write(&self, partition_id: u32, latency: Duration) {
1357        if partition_id >= self.partition_count {
1358            return;
1359        }
1360
1361        let entry = &self.partitions[partition_id as usize];
1362        let latency_ns = latency.as_nanos() as u64;
1363
1364        // Update atomic counters
1365        entry.event_count.fetch_add(1, Ordering::Relaxed);
1366        entry.write_count.fetch_add(1, Ordering::Relaxed);
1367        entry
1368            .total_latency_ns
1369            .fetch_add(latency_ns, Ordering::Relaxed);
1370
1371        // Update min latency (compare-and-swap loop)
1372        let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1373        while latency_ns < current_min {
1374            match entry.min_latency_ns.compare_exchange_weak(
1375                current_min,
1376                latency_ns,
1377                Ordering::Relaxed,
1378                Ordering::Relaxed,
1379            ) {
1380                Ok(_) => break,
1381                Err(actual) => current_min = actual,
1382            }
1383        }
1384
1385        // Update max latency (compare-and-swap loop)
1386        let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1387        while latency_ns > current_max {
1388            match entry.max_latency_ns.compare_exchange_weak(
1389                current_max,
1390                latency_ns,
1391                Ordering::Relaxed,
1392                Ordering::Relaxed,
1393            ) {
1394                Ok(_) => break,
1395                Err(actual) => current_max = actual,
1396            }
1397        }
1398
1399        // Update Prometheus metrics
1400        let partition_id_str = partition_id.to_string();
1401        self.partition_events_total
1402            .with_label_values(&[&partition_id_str])
1403            .set(entry.event_count.load(Ordering::Relaxed) as i64);
1404        self.partition_write_latency
1405            .with_label_values(&[&partition_id_str])
1406            .observe(latency.as_secs_f64());
1407    }
1408
1409    /// Record an error for a partition
1410    ///
1411    /// # Arguments
1412    /// * `partition_id` - The partition ID (0 to partition_count-1)
1413    #[inline]
1414    pub fn record_error(&self, partition_id: u32) {
1415        if partition_id >= self.partition_count {
1416            return;
1417        }
1418
1419        let entry = &self.partitions[partition_id as usize];
1420        entry.error_count.fetch_add(1, Ordering::Relaxed);
1421
1422        // Update Prometheus metrics
1423        let partition_id_str = partition_id.to_string();
1424        self.partition_errors_total
1425            .with_label_values(&[&partition_id_str])
1426            .inc();
1427    }
1428
1429    /// Record a batch write to a partition
1430    ///
1431    /// # Arguments
1432    /// * `partition_id` - The partition ID (0 to partition_count-1)
1433    /// * `count` - Number of events in the batch
1434    /// * `latency` - Total latency for the batch write
1435    #[inline]
1436    pub fn record_batch_write(&self, partition_id: u32, count: u64, latency: Duration) {
1437        if partition_id >= self.partition_count {
1438            return;
1439        }
1440
1441        let entry = &self.partitions[partition_id as usize];
1442        let latency_ns = latency.as_nanos() as u64;
1443
1444        // Update atomic counters
1445        entry.event_count.fetch_add(count, Ordering::Relaxed);
1446        entry.write_count.fetch_add(1, Ordering::Relaxed);
1447        entry
1448            .total_latency_ns
1449            .fetch_add(latency_ns, Ordering::Relaxed);
1450
1451        // Update min/max latency using per-event average
1452        let per_event_latency_ns = latency_ns / count.max(1);
1453
1454        // Update min latency
1455        let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1456        while per_event_latency_ns < current_min {
1457            match entry.min_latency_ns.compare_exchange_weak(
1458                current_min,
1459                per_event_latency_ns,
1460                Ordering::Relaxed,
1461                Ordering::Relaxed,
1462            ) {
1463                Ok(_) => break,
1464                Err(actual) => current_min = actual,
1465            }
1466        }
1467
1468        // Update max latency
1469        let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1470        while per_event_latency_ns > current_max {
1471            match entry.max_latency_ns.compare_exchange_weak(
1472                current_max,
1473                per_event_latency_ns,
1474                Ordering::Relaxed,
1475                Ordering::Relaxed,
1476            ) {
1477                Ok(_) => break,
1478                Err(actual) => current_max = actual,
1479            }
1480        }
1481
1482        // Update Prometheus metrics
1483        let partition_id_str = partition_id.to_string();
1484        self.partition_events_total
1485            .with_label_values(&[&partition_id_str])
1486            .set(entry.event_count.load(Ordering::Relaxed) as i64);
1487        self.partition_write_latency
1488            .with_label_values(&[&partition_id_str])
1489            .observe(latency.as_secs_f64());
1490    }
1491
1492    /// Get statistics for a specific partition
1493    pub fn get_partition_stats(&self, partition_id: u32) -> Option<PartitionStats> {
1494        if partition_id >= self.partition_count {
1495            return None;
1496        }
1497
1498        let entry = &self.partitions[partition_id as usize];
1499
1500        Some(PartitionStats {
1501            partition_id,
1502            event_count: entry.event_count.load(Ordering::Relaxed),
1503            total_latency_ns: entry.total_latency_ns.load(Ordering::Relaxed),
1504            write_count: entry.write_count.load(Ordering::Relaxed),
1505            min_latency_ns: entry.min_latency_ns.load(Ordering::Relaxed),
1506            max_latency_ns: entry.max_latency_ns.load(Ordering::Relaxed),
1507            error_count: entry.error_count.load(Ordering::Relaxed),
1508        })
1509    }
1510
1511    /// Get statistics for all partitions
1512    pub fn get_all_partition_stats(&self) -> Vec<PartitionStats> {
1513        (0..self.partition_count)
1514            .filter_map(|id| self.get_partition_stats(id))
1515            .collect()
1516    }
1517
1518    /// Detect partition imbalance (hot partitions)
1519    ///
1520    /// Returns alerts for any partition with >2x average event count.
1521    /// This is the SierraDB pattern for detecting skew and hot partitions.
1522    ///
1523    /// # Returns
1524    /// Vector of alerts for imbalanced partitions
1525    pub fn detect_partition_imbalance(&self) -> Vec<PartitionImbalanceAlert> {
1526        let mut alerts = Vec::new();
1527        let stats = self.get_all_partition_stats();
1528
1529        // Calculate total and average event count
1530        let total_events: u64 = stats.iter().map(|s| s.event_count).sum();
1531        let active_partitions = stats.iter().filter(|s| s.event_count > 0).count();
1532
1533        if active_partitions == 0 {
1534            return alerts;
1535        }
1536
1537        let average_count = total_events as f64 / active_partitions as f64;
1538        let imbalance_threshold = 2.0; // SierraDB threshold: 2x average
1539
1540        for stat in stats {
1541            if stat.event_count == 0 {
1542                continue;
1543            }
1544
1545            let ratio = stat.event_count as f64 / average_count;
1546
1547            if ratio > imbalance_threshold {
1548                alerts.push(PartitionImbalanceAlert {
1549                    partition_id: stat.partition_id,
1550                    event_count: stat.event_count,
1551                    average_count,
1552                    ratio_to_average: ratio,
1553                    message: format!(
1554                        "Partition {} has {:.1}x average load ({} events vs {:.0} avg)",
1555                        stat.partition_id, ratio, stat.event_count, average_count
1556                    ),
1557                    timestamp: chrono::Utc::now(),
1558                });
1559            }
1560        }
1561
1562        alerts
1563    }
1564
1565    /// Get partition count
1566    pub fn partition_count(&self) -> u32 {
1567        self.partition_count
1568    }
1569
1570    /// Get total events across all partitions
1571    pub fn total_events(&self) -> u64 {
1572        self.partitions
1573            .iter()
1574            .map(|e| e.event_count.load(Ordering::Relaxed))
1575            .sum()
1576    }
1577
1578    /// Get total errors across all partitions
1579    pub fn total_errors(&self) -> u64 {
1580        self.partitions
1581            .iter()
1582            .map(|e| e.error_count.load(Ordering::Relaxed))
1583            .sum()
1584    }
1585
1586    /// Get uptime since metrics collection started
1587    pub fn uptime(&self) -> Duration {
1588        self.started_at.elapsed()
1589    }
1590
1591    /// Get the Prometheus registry for this partition metrics
1592    pub fn registry(&self) -> &Registry {
1593        &self.registry
1594    }
1595
1596    /// Encode metrics in Prometheus text format
1597    pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
1598        use prometheus::Encoder;
1599        let encoder = prometheus::TextEncoder::new();
1600        let metric_families = self.registry.gather();
1601        let mut buffer = Vec::new();
1602        encoder.encode(&metric_families, &mut buffer)?;
1603        Ok(String::from_utf8(buffer)?)
1604    }
1605
1606    /// Get partition distribution as a map
1607    pub fn get_distribution(&self) -> HashMap<u32, u64> {
1608        self.partitions
1609            .iter()
1610            .enumerate()
1611            .map(|(id, entry)| (id as u32, entry.event_count.load(Ordering::Relaxed)))
1612            .collect()
1613    }
1614
1615    /// Reset all partition metrics
1616    pub fn reset(&self) {
1617        for entry in &self.partitions {
1618            entry.event_count.store(0, Ordering::Relaxed);
1619            entry.total_latency_ns.store(0, Ordering::Relaxed);
1620            entry.write_count.store(0, Ordering::Relaxed);
1621            entry.min_latency_ns.store(u64::MAX, Ordering::Relaxed);
1622            entry.max_latency_ns.store(0, Ordering::Relaxed);
1623            entry.error_count.store(0, Ordering::Relaxed);
1624        }
1625    }
1626}
1627
1628impl Default for PartitionMetrics {
1629    fn default() -> Self {
1630        Self::with_default_partitions()
1631    }
1632}
1633
1634#[cfg(test)]
1635mod partition_tests {
1636    use super::*;
1637    use std::thread;
1638
1639    #[test]
1640    fn test_partition_metrics_creation() {
1641        let metrics = PartitionMetrics::new(32);
1642        assert_eq!(metrics.partition_count(), 32);
1643        assert_eq!(metrics.total_events(), 0);
1644        assert_eq!(metrics.total_errors(), 0);
1645    }
1646
1647    #[test]
1648    fn test_partition_metrics_default() {
1649        let metrics = PartitionMetrics::default();
1650        assert_eq!(metrics.partition_count(), 32);
1651    }
1652
1653    #[test]
1654    fn test_record_write() {
1655        let metrics = PartitionMetrics::new(32);
1656
1657        metrics.record_write(0, Duration::from_micros(100));
1658        metrics.record_write(0, Duration::from_micros(200));
1659        metrics.record_write(1, Duration::from_micros(150));
1660
1661        let stats0 = metrics.get_partition_stats(0).unwrap();
1662        assert_eq!(stats0.event_count, 2);
1663        assert_eq!(stats0.write_count, 2);
1664
1665        let stats1 = metrics.get_partition_stats(1).unwrap();
1666        assert_eq!(stats1.event_count, 1);
1667    }
1668
1669    #[test]
1670    fn test_record_batch_write() {
1671        let metrics = PartitionMetrics::new(32);
1672
1673        metrics.record_batch_write(5, 100, Duration::from_millis(10));
1674
1675        let stats = metrics.get_partition_stats(5).unwrap();
1676        assert_eq!(stats.event_count, 100);
1677        assert_eq!(stats.write_count, 1);
1678    }
1679
1680    #[test]
1681    fn test_record_error() {
1682        let metrics = PartitionMetrics::new(32);
1683
1684        metrics.record_error(3);
1685        metrics.record_error(3);
1686        metrics.record_error(5);
1687
1688        let stats3 = metrics.get_partition_stats(3).unwrap();
1689        assert_eq!(stats3.error_count, 2);
1690
1691        let stats5 = metrics.get_partition_stats(5).unwrap();
1692        assert_eq!(stats5.error_count, 1);
1693
1694        assert_eq!(metrics.total_errors(), 3);
1695    }
1696
1697    #[test]
1698    fn test_invalid_partition_id() {
1699        let metrics = PartitionMetrics::new(32);
1700
1701        // Should not panic, just ignore invalid partition IDs
1702        metrics.record_write(100, Duration::from_micros(100));
1703        metrics.record_error(100);
1704
1705        assert!(metrics.get_partition_stats(100).is_none());
1706    }
1707
1708    #[test]
1709    fn test_latency_tracking() {
1710        let metrics = PartitionMetrics::new(32);
1711
1712        metrics.record_write(0, Duration::from_micros(100));
1713        metrics.record_write(0, Duration::from_micros(200));
1714        metrics.record_write(0, Duration::from_micros(300));
1715
1716        let stats = metrics.get_partition_stats(0).unwrap();
1717        assert_eq!(stats.min_latency_ns, 100_000); // 100 microseconds in nanoseconds
1718        assert_eq!(stats.max_latency_ns, 300_000); // 300 microseconds in nanoseconds
1719
1720        let avg = stats.avg_latency().unwrap();
1721        assert_eq!(avg, Duration::from_nanos(200_000)); // Average: 200 microseconds
1722    }
1723
1724    #[test]
1725    fn test_detect_partition_imbalance_no_imbalance() {
1726        let metrics = PartitionMetrics::new(4);
1727
1728        // Distribute events evenly
1729        for i in 0..4 {
1730            for _ in 0..100 {
1731                metrics.record_write(i, Duration::from_micros(100));
1732            }
1733        }
1734
1735        let alerts = metrics.detect_partition_imbalance();
1736        assert!(
1737            alerts.is_empty(),
1738            "No alerts expected for balanced partitions"
1739        );
1740    }
1741
1742    #[test]
1743    fn test_detect_partition_imbalance_hot_partition() {
1744        let metrics = PartitionMetrics::new(4);
1745
1746        // Partition 0 gets 500 events, others get 100 each
1747        // Average = (500 + 100 + 100 + 100) / 4 = 200
1748        // Partition 0 ratio = 500/200 = 2.5x (>2x threshold)
1749        for _ in 0..500 {
1750            metrics.record_write(0, Duration::from_micros(100));
1751        }
1752        for i in 1..4 {
1753            for _ in 0..100 {
1754                metrics.record_write(i, Duration::from_micros(100));
1755            }
1756        }
1757
1758        let alerts = metrics.detect_partition_imbalance();
1759        assert_eq!(alerts.len(), 1, "Expected one alert for hot partition");
1760        assert_eq!(alerts[0].partition_id, 0);
1761        assert!(alerts[0].ratio_to_average > 2.0);
1762    }
1763
1764    #[test]
1765    fn test_detect_partition_imbalance_empty() {
1766        let metrics = PartitionMetrics::new(4);
1767
1768        let alerts = metrics.detect_partition_imbalance();
1769        assert!(alerts.is_empty(), "No alerts expected for empty metrics");
1770    }
1771
1772    #[test]
1773    fn test_get_all_partition_stats() {
1774        let metrics = PartitionMetrics::new(4);
1775
1776        metrics.record_write(0, Duration::from_micros(100));
1777        metrics.record_write(2, Duration::from_micros(200));
1778
1779        let all_stats = metrics.get_all_partition_stats();
1780        assert_eq!(all_stats.len(), 4);
1781        assert_eq!(all_stats[0].event_count, 1);
1782        assert_eq!(all_stats[1].event_count, 0);
1783        assert_eq!(all_stats[2].event_count, 1);
1784        assert_eq!(all_stats[3].event_count, 0);
1785    }
1786
1787    #[test]
1788    fn test_prometheus_encoding() {
1789        let metrics = PartitionMetrics::new(4);
1790
1791        metrics.record_write(0, Duration::from_micros(100));
1792        metrics.record_write(1, Duration::from_micros(200));
1793        metrics.record_error(0);
1794
1795        let encoded = metrics.encode().unwrap();
1796
1797        assert!(encoded.contains("allsource_partition_events_total"));
1798        assert!(encoded.contains("allsource_partition_write_latency"));
1799        assert!(encoded.contains("allsource_partition_errors_total"));
1800    }
1801
1802    #[test]
1803    fn test_reset() {
1804        let metrics = PartitionMetrics::new(4);
1805
1806        metrics.record_write(0, Duration::from_micros(100));
1807        metrics.record_error(1);
1808
1809        assert_eq!(metrics.total_events(), 1);
1810        assert_eq!(metrics.total_errors(), 1);
1811
1812        metrics.reset();
1813
1814        assert_eq!(metrics.total_events(), 0);
1815        assert_eq!(metrics.total_errors(), 0);
1816    }
1817
1818    #[test]
1819    fn test_concurrent_writes() {
1820        let metrics = Arc::new(PartitionMetrics::new(32));
1821        let mut handles = vec![];
1822
1823        // Spawn 8 threads, each writing 1000 events to random partitions
1824        for _ in 0..8 {
1825            let metrics_clone = metrics.clone();
1826            let handle = thread::spawn(move || {
1827                for i in 0..1000 {
1828                    let partition_id = (i % 32) as u32;
1829                    metrics_clone.record_write(partition_id, Duration::from_micros(100));
1830                }
1831            });
1832            handles.push(handle);
1833        }
1834
1835        for handle in handles {
1836            handle.join().unwrap();
1837        }
1838
1839        assert_eq!(metrics.total_events(), 8000);
1840    }
1841
1842    #[test]
1843    fn test_get_distribution() {
1844        let metrics = PartitionMetrics::new(4);
1845
1846        metrics.record_write(0, Duration::from_micros(100));
1847        metrics.record_write(0, Duration::from_micros(100));
1848        metrics.record_write(2, Duration::from_micros(100));
1849
1850        let distribution = metrics.get_distribution();
1851
1852        assert_eq!(distribution.get(&0), Some(&2));
1853        assert_eq!(distribution.get(&1), Some(&0));
1854        assert_eq!(distribution.get(&2), Some(&1));
1855        assert_eq!(distribution.get(&3), Some(&0));
1856    }
1857
1858    #[test]
1859    fn test_partition_stats_avg_latency_none() {
1860        let stats = PartitionStats::new(0);
1861        assert!(stats.avg_latency().is_none());
1862    }
1863
1864    #[test]
1865    fn test_alert_message_format() {
1866        let metrics = PartitionMetrics::new(4);
1867
1868        // Create imbalanced scenario
1869        for _ in 0..1000 {
1870            metrics.record_write(0, Duration::from_micros(100));
1871        }
1872        for i in 1..4 {
1873            for _ in 0..100 {
1874                metrics.record_write(i, Duration::from_micros(100));
1875            }
1876        }
1877
1878        let alerts = metrics.detect_partition_imbalance();
1879        assert!(!alerts.is_empty());
1880
1881        let alert = &alerts[0];
1882        assert!(alert.message.contains("Partition 0"));
1883        assert!(alert.message.contains("average load"));
1884    }
1885
1886    #[test]
1887    fn test_uptime() {
1888        let metrics = PartitionMetrics::new(4);
1889
1890        // Sleep a bit to ensure non-zero uptime
1891        thread::sleep(Duration::from_millis(10));
1892
1893        let uptime = metrics.uptime();
1894        assert!(uptime.as_millis() >= 10);
1895    }
1896}