Skip to main content

allsource_core/infrastructure/observability/
metrics.rs

1use prometheus::{
2    Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
3    Registry,
4};
5use serde::Serialize;
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10
11/// Centralized metrics registry for AllSource
12pub struct MetricsRegistry {
13    /// Prometheus registry
14    registry: Registry,
15
16    // Event ingestion metrics
17    pub events_ingested_total: IntCounter,
18    pub events_ingested_by_type: IntCounterVec,
19    pub ingestion_duration_seconds: Histogram,
20    pub ingestion_errors_total: IntCounter,
21
22    // Query metrics
23    pub queries_total: IntCounterVec,
24    pub query_duration_seconds: HistogramVec,
25    pub query_results_total: IntCounterVec,
26
27    // Storage metrics
28    pub storage_events_total: IntGauge,
29    pub storage_entities_total: IntGauge,
30    pub storage_size_bytes: IntGauge,
31    pub parquet_files_total: IntGauge,
32    pub wal_segments_total: IntGauge,
33
34    // Projection metrics
35    pub projections_total: IntGauge,
36    pub projection_events_processed: IntCounterVec,
37    pub projection_errors_total: IntCounterVec,
38    pub projection_processing_duration: HistogramVec,
39    pub projection_duration_seconds: Histogram,
40
41    // Schema registry metrics (v0.5)
42    pub schemas_registered_total: IntCounter,
43    pub schema_validations_total: IntCounterVec,
44    pub schema_validation_duration: Histogram,
45
46    // Replay metrics (v0.5)
47    pub replays_started_total: IntCounter,
48    pub replays_completed_total: IntCounter,
49    pub replays_failed_total: IntCounter,
50    pub replay_events_processed: IntCounter,
51    pub replay_duration_seconds: Histogram,
52
53    // Pipeline metrics (v0.5)
54    pub pipelines_registered_total: IntGauge,
55    pub pipeline_events_processed: IntCounterVec,
56    pub pipeline_events_filtered: IntCounterVec,
57    pub pipeline_errors_total: IntCounterVec,
58    pub pipeline_processing_duration: HistogramVec,
59    pub pipeline_duration_seconds: Histogram,
60
61    // Snapshot metrics
62    pub snapshots_created_total: IntCounter,
63    pub snapshot_creation_duration: Histogram,
64    pub snapshots_total: IntGauge,
65
66    // Compaction metrics
67    pub compactions_total: IntCounter,
68    pub compaction_duration_seconds: Histogram,
69    pub compaction_files_merged: IntCounter,
70    pub compaction_bytes_saved: IntCounter,
71
72    // WebSocket metrics
73    pub websocket_connections_active: IntGauge,
74    pub websocket_connections_total: IntCounter,
75    pub websocket_messages_sent: IntCounter,
76    pub websocket_errors_total: IntCounter,
77
78    // System metrics
79    pub http_requests_total: IntCounterVec,
80    pub http_request_duration_seconds: HistogramVec,
81    pub http_requests_in_flight: IntGauge,
82}
83
84impl MetricsRegistry {
85    pub fn new() -> Arc<Self> {
86        let registry = Registry::new();
87
88        // Event ingestion metrics
89        let events_ingested_total = IntCounter::with_opts(Opts::new(
90            "allsource_events_ingested_total",
91            "Total number of events ingested",
92        ))
93        .unwrap();
94
95        let events_ingested_by_type = IntCounterVec::new(
96            Opts::new(
97                "allsource_events_ingested_by_type",
98                "Events ingested by type",
99            ),
100            &["event_type"],
101        )
102        .unwrap();
103
104        let ingestion_duration_seconds = Histogram::with_opts(HistogramOpts::new(
105            "allsource_ingestion_duration_seconds",
106            "Event ingestion duration in seconds",
107        ))
108        .unwrap();
109
110        let ingestion_errors_total = IntCounter::with_opts(Opts::new(
111            "allsource_ingestion_errors_total",
112            "Total number of ingestion errors",
113        ))
114        .unwrap();
115
116        // Query metrics
117        let queries_total = IntCounterVec::new(
118            Opts::new("allsource_queries_total", "Total number of queries"),
119            &["query_type"],
120        )
121        .unwrap();
122
123        let query_duration_seconds = HistogramVec::new(
124            HistogramOpts::new(
125                "allsource_query_duration_seconds",
126                "Query duration in seconds",
127            ),
128            &["query_type"],
129        )
130        .unwrap();
131
132        let query_results_total = IntCounterVec::new(
133            Opts::new(
134                "allsource_query_results_total",
135                "Total number of events returned by queries",
136            ),
137            &["query_type"],
138        )
139        .unwrap();
140
141        // Storage metrics
142        let storage_events_total = IntGauge::with_opts(Opts::new(
143            "allsource_storage_events_total",
144            "Total number of events in storage",
145        ))
146        .unwrap();
147
148        let storage_entities_total = IntGauge::with_opts(Opts::new(
149            "allsource_storage_entities_total",
150            "Total number of entities in storage",
151        ))
152        .unwrap();
153
154        let storage_size_bytes = IntGauge::with_opts(Opts::new(
155            "allsource_storage_size_bytes",
156            "Total storage size in bytes",
157        ))
158        .unwrap();
159
160        let parquet_files_total = IntGauge::with_opts(Opts::new(
161            "allsource_parquet_files_total",
162            "Number of Parquet files",
163        ))
164        .unwrap();
165
166        let wal_segments_total = IntGauge::with_opts(Opts::new(
167            "allsource_wal_segments_total",
168            "Number of WAL segments",
169        ))
170        .unwrap();
171
172        // Projection metrics
173        let projection_events_processed = IntCounterVec::new(
174            Opts::new(
175                "allsource_projection_events_processed",
176                "Events processed by projections",
177            ),
178            &["projection_name"],
179        )
180        .unwrap();
181
182        let projection_errors_total = IntCounterVec::new(
183            Opts::new(
184                "allsource_projection_errors_total",
185                "Total projection errors",
186            ),
187            &["projection_name"],
188        )
189        .unwrap();
190
191        let projection_processing_duration = HistogramVec::new(
192            HistogramOpts::new(
193                "allsource_projection_processing_duration_seconds",
194                "Projection processing duration",
195            ),
196            &["projection_name"],
197        )
198        .unwrap();
199
200        let projections_total = IntGauge::with_opts(Opts::new(
201            "allsource_projections_total",
202            "Number of registered projections",
203        ))
204        .unwrap();
205
206        let projection_duration_seconds = Histogram::with_opts(HistogramOpts::new(
207            "allsource_projection_duration_seconds",
208            "Overall projection manager processing duration",
209        ))
210        .unwrap();
211
212        // Schema registry metrics (v0.5)
213        let schemas_registered_total = IntCounter::with_opts(Opts::new(
214            "allsource_schemas_registered_total",
215            "Total number of schemas registered",
216        ))
217        .unwrap();
218
219        let schema_validations_total = IntCounterVec::new(
220            Opts::new(
221                "allsource_schema_validations_total",
222                "Schema validations by result",
223            ),
224            &["subject", "result"],
225        )
226        .unwrap();
227
228        let schema_validation_duration = Histogram::with_opts(HistogramOpts::new(
229            "allsource_schema_validation_duration_seconds",
230            "Schema validation duration",
231        ))
232        .unwrap();
233
234        // Replay metrics (v0.5)
235        let replays_started_total = IntCounter::with_opts(Opts::new(
236            "allsource_replays_started_total",
237            "Total replays started",
238        ))
239        .unwrap();
240
241        let replays_completed_total = IntCounter::with_opts(Opts::new(
242            "allsource_replays_completed_total",
243            "Total replays completed",
244        ))
245        .unwrap();
246
247        let replays_failed_total = IntCounter::with_opts(Opts::new(
248            "allsource_replays_failed_total",
249            "Total replays failed",
250        ))
251        .unwrap();
252
253        let replay_events_processed = IntCounter::with_opts(Opts::new(
254            "allsource_replay_events_processed",
255            "Events processed during replays",
256        ))
257        .unwrap();
258
259        let replay_duration_seconds = Histogram::with_opts(HistogramOpts::new(
260            "allsource_replay_duration_seconds",
261            "Replay duration",
262        ))
263        .unwrap();
264
265        // Pipeline metrics (v0.5)
266        let pipelines_registered_total = IntGauge::with_opts(Opts::new(
267            "allsource_pipelines_registered_total",
268            "Number of registered pipelines",
269        ))
270        .unwrap();
271
272        let pipeline_events_processed = IntCounterVec::new(
273            Opts::new(
274                "allsource_pipeline_events_processed",
275                "Events processed by pipelines",
276            ),
277            &["pipeline_id", "pipeline_name"],
278        )
279        .unwrap();
280
281        let pipeline_events_filtered = IntCounterVec::new(
282            Opts::new(
283                "allsource_pipeline_events_filtered",
284                "Events filtered by pipelines",
285            ),
286            &["pipeline_id", "pipeline_name"],
287        )
288        .unwrap();
289
290        let pipeline_processing_duration = HistogramVec::new(
291            HistogramOpts::new(
292                "allsource_pipeline_processing_duration_seconds",
293                "Pipeline processing duration",
294            ),
295            &["pipeline_id", "pipeline_name"],
296        )
297        .unwrap();
298
299        let pipeline_errors_total = IntCounterVec::new(
300            Opts::new("allsource_pipeline_errors_total", "Total pipeline errors"),
301            &["pipeline_name"],
302        )
303        .unwrap();
304
305        let pipeline_duration_seconds = Histogram::with_opts(HistogramOpts::new(
306            "allsource_pipeline_duration_seconds",
307            "Overall pipeline manager processing duration",
308        ))
309        .unwrap();
310
311        // Snapshot metrics
312        let snapshots_created_total = IntCounter::with_opts(Opts::new(
313            "allsource_snapshots_created_total",
314            "Total snapshots created",
315        ))
316        .unwrap();
317
318        let snapshot_creation_duration = Histogram::with_opts(HistogramOpts::new(
319            "allsource_snapshot_creation_duration_seconds",
320            "Snapshot creation duration",
321        ))
322        .unwrap();
323
324        let snapshots_total = IntGauge::with_opts(Opts::new(
325            "allsource_snapshots_total",
326            "Total number of snapshots",
327        ))
328        .unwrap();
329
330        // Compaction metrics
331        let compactions_total = IntCounter::with_opts(Opts::new(
332            "allsource_compactions_total",
333            "Total compactions performed",
334        ))
335        .unwrap();
336
337        let compaction_duration_seconds = Histogram::with_opts(HistogramOpts::new(
338            "allsource_compaction_duration_seconds",
339            "Compaction duration",
340        ))
341        .unwrap();
342
343        let compaction_files_merged = IntCounter::with_opts(Opts::new(
344            "allsource_compaction_files_merged",
345            "Files merged during compaction",
346        ))
347        .unwrap();
348
349        let compaction_bytes_saved = IntCounter::with_opts(Opts::new(
350            "allsource_compaction_bytes_saved",
351            "Bytes saved by compaction",
352        ))
353        .unwrap();
354
355        // WebSocket metrics
356        let websocket_connections_active = IntGauge::with_opts(Opts::new(
357            "allsource_websocket_connections_active",
358            "Active WebSocket connections",
359        ))
360        .unwrap();
361
362        let websocket_connections_total = IntCounter::with_opts(Opts::new(
363            "allsource_websocket_connections_total",
364            "Total WebSocket connections",
365        ))
366        .unwrap();
367
368        let websocket_messages_sent = IntCounter::with_opts(Opts::new(
369            "allsource_websocket_messages_sent",
370            "WebSocket messages sent",
371        ))
372        .unwrap();
373
374        let websocket_errors_total = IntCounter::with_opts(Opts::new(
375            "allsource_websocket_errors_total",
376            "WebSocket errors",
377        ))
378        .unwrap();
379
380        // System metrics
381        let http_requests_total = IntCounterVec::new(
382            Opts::new("allsource_http_requests_total", "Total HTTP requests"),
383            &["method", "endpoint", "status"],
384        )
385        .unwrap();
386
387        let http_request_duration_seconds = HistogramVec::new(
388            HistogramOpts::new(
389                "allsource_http_request_duration_seconds",
390                "HTTP request duration",
391            ),
392            &["method", "endpoint"],
393        )
394        .unwrap();
395
396        let http_requests_in_flight = IntGauge::with_opts(Opts::new(
397            "allsource_http_requests_in_flight",
398            "HTTP requests currently being processed",
399        ))
400        .unwrap();
401
402        // Register all metrics
403        registry
404            .register(Box::new(events_ingested_total.clone()))
405            .unwrap();
406        registry
407            .register(Box::new(events_ingested_by_type.clone()))
408            .unwrap();
409        registry
410            .register(Box::new(ingestion_duration_seconds.clone()))
411            .unwrap();
412        registry
413            .register(Box::new(ingestion_errors_total.clone()))
414            .unwrap();
415
416        registry.register(Box::new(queries_total.clone())).unwrap();
417        registry
418            .register(Box::new(query_duration_seconds.clone()))
419            .unwrap();
420        registry
421            .register(Box::new(query_results_total.clone()))
422            .unwrap();
423
424        registry
425            .register(Box::new(storage_events_total.clone()))
426            .unwrap();
427        registry
428            .register(Box::new(storage_entities_total.clone()))
429            .unwrap();
430        registry
431            .register(Box::new(storage_size_bytes.clone()))
432            .unwrap();
433        registry
434            .register(Box::new(parquet_files_total.clone()))
435            .unwrap();
436        registry
437            .register(Box::new(wal_segments_total.clone()))
438            .unwrap();
439
440        registry
441            .register(Box::new(projection_events_processed.clone()))
442            .unwrap();
443        registry
444            .register(Box::new(projection_errors_total.clone()))
445            .unwrap();
446        registry
447            .register(Box::new(projection_processing_duration.clone()))
448            .unwrap();
449        registry
450            .register(Box::new(projections_total.clone()))
451            .unwrap();
452        registry
453            .register(Box::new(projection_duration_seconds.clone()))
454            .unwrap();
455
456        registry
457            .register(Box::new(schemas_registered_total.clone()))
458            .unwrap();
459        registry
460            .register(Box::new(schema_validations_total.clone()))
461            .unwrap();
462        registry
463            .register(Box::new(schema_validation_duration.clone()))
464            .unwrap();
465
466        registry
467            .register(Box::new(replays_started_total.clone()))
468            .unwrap();
469        registry
470            .register(Box::new(replays_completed_total.clone()))
471            .unwrap();
472        registry
473            .register(Box::new(replays_failed_total.clone()))
474            .unwrap();
475        registry
476            .register(Box::new(replay_events_processed.clone()))
477            .unwrap();
478        registry
479            .register(Box::new(replay_duration_seconds.clone()))
480            .unwrap();
481
482        registry
483            .register(Box::new(pipelines_registered_total.clone()))
484            .unwrap();
485        registry
486            .register(Box::new(pipeline_events_processed.clone()))
487            .unwrap();
488        registry
489            .register(Box::new(pipeline_events_filtered.clone()))
490            .unwrap();
491        registry
492            .register(Box::new(pipeline_processing_duration.clone()))
493            .unwrap();
494        registry
495            .register(Box::new(pipeline_errors_total.clone()))
496            .unwrap();
497        registry
498            .register(Box::new(pipeline_duration_seconds.clone()))
499            .unwrap();
500
501        registry
502            .register(Box::new(snapshots_created_total.clone()))
503            .unwrap();
504        registry
505            .register(Box::new(snapshot_creation_duration.clone()))
506            .unwrap();
507        registry
508            .register(Box::new(snapshots_total.clone()))
509            .unwrap();
510
511        registry
512            .register(Box::new(compactions_total.clone()))
513            .unwrap();
514        registry
515            .register(Box::new(compaction_duration_seconds.clone()))
516            .unwrap();
517        registry
518            .register(Box::new(compaction_files_merged.clone()))
519            .unwrap();
520        registry
521            .register(Box::new(compaction_bytes_saved.clone()))
522            .unwrap();
523
524        registry
525            .register(Box::new(websocket_connections_active.clone()))
526            .unwrap();
527        registry
528            .register(Box::new(websocket_connections_total.clone()))
529            .unwrap();
530        registry
531            .register(Box::new(websocket_messages_sent.clone()))
532            .unwrap();
533        registry
534            .register(Box::new(websocket_errors_total.clone()))
535            .unwrap();
536
537        registry
538            .register(Box::new(http_requests_total.clone()))
539            .unwrap();
540        registry
541            .register(Box::new(http_request_duration_seconds.clone()))
542            .unwrap();
543        registry
544            .register(Box::new(http_requests_in_flight.clone()))
545            .unwrap();
546
547        Arc::new(Self {
548            registry,
549            events_ingested_total,
550            events_ingested_by_type,
551            ingestion_duration_seconds,
552            ingestion_errors_total,
553            queries_total,
554            query_duration_seconds,
555            query_results_total,
556            storage_events_total,
557            storage_entities_total,
558            storage_size_bytes,
559            parquet_files_total,
560            wal_segments_total,
561            projection_events_processed,
562            projection_errors_total,
563            projection_processing_duration,
564            projections_total,
565            projection_duration_seconds,
566            schemas_registered_total,
567            schema_validations_total,
568            schema_validation_duration,
569            replays_started_total,
570            replays_completed_total,
571            replays_failed_total,
572            replay_events_processed,
573            replay_duration_seconds,
574            pipelines_registered_total,
575            pipeline_events_processed,
576            pipeline_events_filtered,
577            pipeline_processing_duration,
578            pipeline_errors_total,
579            pipeline_duration_seconds,
580            snapshots_created_total,
581            snapshot_creation_duration,
582            snapshots_total,
583            compactions_total,
584            compaction_duration_seconds,
585            compaction_files_merged,
586            compaction_bytes_saved,
587            websocket_connections_active,
588            websocket_connections_total,
589            websocket_messages_sent,
590            websocket_errors_total,
591            http_requests_total,
592            http_request_duration_seconds,
593            http_requests_in_flight,
594        })
595    }
596
597    /// Get the Prometheus registry
598    pub fn registry(&self) -> &Registry {
599        &self.registry
600    }
601
602    /// Encode metrics in Prometheus text format
603    pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
604        use prometheus::Encoder;
605        let encoder = prometheus::TextEncoder::new();
606        let metric_families = self.registry.gather();
607        let mut buffer = Vec::new();
608        encoder.encode(&metric_families, &mut buffer)?;
609        Ok(String::from_utf8(buffer)?)
610    }
611}
612
613// Note: Clone and Default are intentionally NOT implemented for MetricsRegistry.
614// Use Arc<MetricsRegistry> to share the same registry across the application.
615// Creating multiple registries would result in duplicate metrics which is incorrect.
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620
621    #[test]
622    fn test_metrics_registry_creation() {
623        let metrics = MetricsRegistry::new();
624        assert_eq!(metrics.events_ingested_total.get(), 0);
625        assert_eq!(metrics.storage_events_total.get(), 0);
626    }
627
628    #[test]
629    fn test_event_ingestion_metrics() {
630        let metrics = MetricsRegistry::new();
631
632        // Increment ingestion counter
633        metrics.events_ingested_total.inc();
634        assert_eq!(metrics.events_ingested_total.get(), 1);
635
636        // Increment by type
637        metrics
638            .events_ingested_by_type
639            .with_label_values(&["user.created"])
640            .inc();
641        assert_eq!(
642            metrics
643                .events_ingested_by_type
644                .with_label_values(&["user.created"])
645                .get(),
646            1
647        );
648
649        // Record duration
650        metrics.ingestion_duration_seconds.observe(0.1);
651    }
652
653    #[test]
654    fn test_query_metrics() {
655        let metrics = MetricsRegistry::new();
656
657        // Increment query counter
658        metrics
659            .queries_total
660            .with_label_values(&["entity_id"])
661            .inc();
662        assert_eq!(
663            metrics
664                .queries_total
665                .with_label_values(&["entity_id"])
666                .get(),
667            1
668        );
669
670        // Record query duration
671        metrics
672            .query_duration_seconds
673            .with_label_values(&["entity_id"])
674            .observe(0.05);
675
676        // Record query results
677        metrics
678            .query_results_total
679            .with_label_values(&["entity_id"])
680            .inc_by(10);
681    }
682
683    #[test]
684    fn test_storage_metrics() {
685        let metrics = MetricsRegistry::new();
686
687        // Set storage metrics
688        metrics.storage_events_total.set(1000);
689        assert_eq!(metrics.storage_events_total.get(), 1000);
690
691        metrics.storage_entities_total.set(50);
692        assert_eq!(metrics.storage_entities_total.get(), 50);
693
694        metrics.storage_size_bytes.set(1024 * 1024);
695        assert_eq!(metrics.storage_size_bytes.get(), 1024 * 1024);
696
697        metrics.parquet_files_total.set(5);
698        metrics.wal_segments_total.set(3);
699    }
700
701    #[test]
702    fn test_projection_metrics() {
703        let metrics = MetricsRegistry::new();
704
705        // Set projections total
706        metrics.projections_total.set(3);
707        assert_eq!(metrics.projections_total.get(), 3);
708
709        // Process events in projection
710        metrics
711            .projection_events_processed
712            .with_label_values(&["user_snapshot"])
713            .inc_by(100);
714
715        // Record processing duration
716        metrics
717            .projection_processing_duration
718            .with_label_values(&["user_snapshot"])
719            .observe(0.2);
720
721        // Record errors
722        metrics
723            .projection_errors_total
724            .with_label_values(&["user_snapshot"])
725            .inc();
726    }
727
728    #[test]
729    fn test_schema_metrics() {
730        let metrics = MetricsRegistry::new();
731
732        // Register schema
733        metrics.schemas_registered_total.inc();
734        assert_eq!(metrics.schemas_registered_total.get(), 1);
735
736        // Validation success - requires both subject and result labels
737        metrics
738            .schema_validations_total
739            .with_label_values(&["user.schema", "success"])
740            .inc();
741
742        // Validation failure
743        metrics
744            .schema_validations_total
745            .with_label_values(&["order.schema", "failure"])
746            .inc();
747
748        // Record validation duration
749        metrics.schema_validation_duration.observe(0.01);
750    }
751
752    #[test]
753    fn test_replay_metrics() {
754        let metrics = MetricsRegistry::new();
755
756        // Start replay
757        metrics.replays_started_total.inc();
758        assert_eq!(metrics.replays_started_total.get(), 1);
759
760        // Process events
761        metrics.replay_events_processed.inc_by(500);
762        assert_eq!(metrics.replay_events_processed.get(), 500);
763
764        // Complete replay
765        metrics.replays_completed_total.inc();
766        assert_eq!(metrics.replays_completed_total.get(), 1);
767
768        // Record duration
769        metrics.replay_duration_seconds.observe(5.5);
770    }
771
772    #[test]
773    fn test_pipeline_metrics() {
774        let metrics = MetricsRegistry::new();
775
776        // Register pipeline
777        metrics.pipelines_registered_total.set(2);
778        assert_eq!(metrics.pipelines_registered_total.get(), 2);
779
780        // Process events - requires both pipeline_id and pipeline_name labels
781        metrics
782            .pipeline_events_processed
783            .with_label_values(&["pipeline-1", "filter_pipeline"])
784            .inc_by(250);
785
786        // Record errors - only requires pipeline_name
787        metrics
788            .pipeline_errors_total
789            .with_label_values(&["filter_pipeline"])
790            .inc();
791
792        // Record duration - requires both pipeline_id and pipeline_name labels
793        metrics
794            .pipeline_processing_duration
795            .with_label_values(&["pipeline-1", "filter_pipeline"])
796            .observe(0.15);
797    }
798
799    #[test]
800    fn test_metrics_encode() {
801        let metrics = MetricsRegistry::new();
802
803        // Add some data
804        metrics.events_ingested_total.inc_by(100);
805        metrics.storage_events_total.set(1000);
806
807        // Encode to Prometheus format
808        let encoded = metrics.encode().unwrap();
809
810        // Verify output contains metrics
811        assert!(encoded.contains("events_ingested_total"));
812        assert!(encoded.contains("storage_events_total"));
813    }
814
815    #[test]
816    fn test_metrics_default() {
817        let metrics = MetricsRegistry::new();
818        assert_eq!(metrics.events_ingested_total.get(), 0);
819    }
820
821    #[test]
822    fn test_websocket_metrics() {
823        let metrics = MetricsRegistry::new();
824
825        // Connect client
826        metrics.websocket_connections_active.inc();
827        assert_eq!(metrics.websocket_connections_active.get(), 1);
828
829        // Total connections
830        metrics.websocket_connections_total.inc();
831
832        // Broadcast message
833        metrics.websocket_messages_sent.inc_by(10);
834        assert_eq!(metrics.websocket_messages_sent.get(), 10);
835
836        // Disconnect client
837        metrics.websocket_connections_active.dec();
838        assert_eq!(metrics.websocket_connections_active.get(), 0);
839
840        // Record error
841        metrics.websocket_errors_total.inc();
842    }
843
844    #[test]
845    fn test_compaction_metrics() {
846        let metrics = MetricsRegistry::new();
847
848        // Start compaction
849        metrics.compactions_total.inc();
850        assert_eq!(metrics.compactions_total.get(), 1);
851
852        // Record duration
853        metrics.compaction_duration_seconds.observe(5.2);
854
855        // Files merged
856        metrics.compaction_files_merged.inc_by(5);
857
858        // Bytes saved
859        metrics.compaction_bytes_saved.inc_by(1024 * 1024);
860    }
861
862    #[test]
863    fn test_snapshot_metrics() {
864        let metrics = MetricsRegistry::new();
865
866        // Create snapshot
867        metrics.snapshots_created_total.inc();
868        assert_eq!(metrics.snapshots_created_total.get(), 1);
869
870        // Record duration
871        metrics.snapshot_creation_duration.observe(0.5);
872
873        // Total snapshots
874        metrics.snapshots_total.set(10);
875        assert_eq!(metrics.snapshots_total.get(), 10);
876    }
877
878    #[test]
879    fn test_http_metrics() {
880        let metrics = MetricsRegistry::new();
881
882        // Record request
883        metrics
884            .http_requests_total
885            .with_label_values(&["GET", "/api/events", "200"])
886            .inc();
887
888        // Record duration
889        metrics
890            .http_request_duration_seconds
891            .with_label_values(&["GET", "/api/events"])
892            .observe(0.025);
893
894        // In-flight requests
895        metrics.http_requests_in_flight.inc();
896        assert_eq!(metrics.http_requests_in_flight.get(), 1);
897
898        metrics.http_requests_in_flight.dec();
899        assert_eq!(metrics.http_requests_in_flight.get(), 0);
900    }
901}
902
903// =============================================================================
904// Partition Metrics (SierraDB pattern)
905// =============================================================================
906
907/// Per-partition statistics for detecting hot partitions and skew
908///
909/// SierraDB uses 32 fixed partitions for single-node, 1024+ for clusters.
910/// This struct tracks metrics per partition to detect imbalances.
911#[derive(Debug)]
912pub struct PartitionStats {
913    /// Partition ID (0 to partition_count-1)
914    pub partition_id: u32,
915
916    /// Total events written to this partition
917    pub event_count: u64,
918
919    /// Total write latency sum (nanoseconds) for calculating average
920    pub total_latency_ns: u64,
921
922    /// Number of writes for calculating average latency
923    pub write_count: u64,
924
925    /// Minimum write latency (nanoseconds)
926    pub min_latency_ns: u64,
927
928    /// Maximum write latency (nanoseconds)
929    pub max_latency_ns: u64,
930
931    /// Total error count for this partition
932    pub error_count: u64,
933}
934
935impl PartitionStats {
936    fn new(partition_id: u32) -> Self {
937        Self {
938            partition_id,
939            event_count: 0,
940            total_latency_ns: 0,
941            write_count: 0,
942            min_latency_ns: u64::MAX,
943            max_latency_ns: 0,
944            error_count: 0,
945        }
946    }
947
948    /// Calculate average write latency
949    pub fn avg_latency(&self) -> Option<Duration> {
950        if self.write_count == 0 {
951            None
952        } else {
953            Some(Duration::from_nanos(self.total_latency_ns / self.write_count))
954        }
955    }
956}
957
958/// Alert generated when partition imbalance is detected
959#[derive(Debug, Clone, Serialize)]
960pub struct PartitionImbalanceAlert {
961    /// Partition ID that is imbalanced
962    pub partition_id: u32,
963
964    /// Event count for this partition
965    pub event_count: u64,
966
967    /// Average event count across all partitions
968    pub average_count: f64,
969
970    /// Ratio compared to average (>2.0 triggers alert)
971    pub ratio_to_average: f64,
972
973    /// Alert message
974    pub message: String,
975
976    /// Timestamp when alert was generated
977    pub timestamp: chrono::DateTime<chrono::Utc>,
978}
979
980/// Internal structure for tracking partition metrics atomically
981struct PartitionMetricsEntry {
982    event_count: AtomicU64,
983    total_latency_ns: AtomicU64,
984    write_count: AtomicU64,
985    min_latency_ns: AtomicU64,
986    max_latency_ns: AtomicU64,
987    error_count: AtomicU64,
988}
989
990impl PartitionMetricsEntry {
991    fn new() -> Self {
992        Self {
993            event_count: AtomicU64::new(0),
994            total_latency_ns: AtomicU64::new(0),
995            write_count: AtomicU64::new(0),
996            min_latency_ns: AtomicU64::new(u64::MAX),
997            max_latency_ns: AtomicU64::new(0),
998            error_count: AtomicU64::new(0),
999        }
1000    }
1001}
1002
1003/// Partition monitoring for detecting hot partitions and skew (SierraDB pattern)
1004///
1005/// # Design Pattern
1006/// Uses atomic operations for lock-free metric updates per partition.
1007/// Tracks event counts, write latencies, and error rates per partition.
1008///
1009/// # SierraDB Context
1010/// SierraDB uses fixed partitions (32 for single-node, 1024+ for clusters).
1011/// Detecting hot partitions is critical for:
1012/// - Load balancing decisions
1013/// - Identifying skewed hash functions
1014/// - Capacity planning
1015/// - Performance troubleshooting
1016///
1017/// # Imbalance Detection
1018/// A partition is considered imbalanced if it has >2x the average load.
1019/// This threshold is based on SierraDB's experience with production workloads.
1020///
1021/// # Example
1022/// ```ignore
1023/// let partition_metrics = PartitionMetrics::new(32);
1024///
1025/// // Record write to partition 5
1026/// let start = Instant::now();
1027/// // ... write operation ...
1028/// partition_metrics.record_write(5, start.elapsed());
1029///
1030/// // Check for imbalances
1031/// let alerts = partition_metrics.detect_partition_imbalance();
1032/// for alert in alerts {
1033///     tracing::warn!("Partition imbalance: {}", alert.message);
1034/// }
1035/// ```
1036pub struct PartitionMetrics {
1037    /// Number of partitions
1038    partition_count: u32,
1039
1040    /// Per-partition metrics
1041    partitions: Vec<PartitionMetricsEntry>,
1042
1043    /// Prometheus metrics for per-partition event counts
1044    partition_events_total: IntGaugeVec,
1045
1046    /// Prometheus metrics for per-partition write latency histogram
1047    partition_write_latency: HistogramVec,
1048
1049    /// Prometheus metrics for per-partition error counts
1050    partition_errors_total: IntCounterVec,
1051
1052    /// Prometheus registry (for registration)
1053    registry: Registry,
1054
1055    /// Timestamp when metrics collection started
1056    started_at: Instant,
1057}
1058
1059impl PartitionMetrics {
1060    /// Create a new partition metrics tracker
1061    ///
1062    /// # Arguments
1063    /// * `partition_count` - Number of partitions (default: 32 for single-node)
1064    pub fn new(partition_count: u32) -> Self {
1065        let registry = Registry::new();
1066
1067        // Per-partition event count gauge
1068        let partition_events_total = IntGaugeVec::new(
1069            Opts::new(
1070                "allsource_partition_events_total",
1071                "Total events per partition",
1072            ),
1073            &["partition_id"],
1074        )
1075        .expect("Failed to create partition_events_total metric");
1076
1077        // Per-partition write latency histogram
1078        let partition_write_latency = HistogramVec::new(
1079            HistogramOpts::new(
1080                "allsource_partition_write_latency_seconds",
1081                "Write latency per partition in seconds",
1082            )
1083            .buckets(vec![
1084                0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
1085            ]),
1086            &["partition_id"],
1087        )
1088        .expect("Failed to create partition_write_latency metric");
1089
1090        // Per-partition error counter
1091        let partition_errors_total = IntCounterVec::new(
1092            Opts::new(
1093                "allsource_partition_errors_total",
1094                "Total errors per partition",
1095            ),
1096            &["partition_id"],
1097        )
1098        .expect("Failed to create partition_errors_total metric");
1099
1100        // Register metrics
1101        registry
1102            .register(Box::new(partition_events_total.clone()))
1103            .expect("Failed to register partition_events_total");
1104        registry
1105            .register(Box::new(partition_write_latency.clone()))
1106            .expect("Failed to register partition_write_latency");
1107        registry
1108            .register(Box::new(partition_errors_total.clone()))
1109            .expect("Failed to register partition_errors_total");
1110
1111        // Initialize per-partition atomic counters
1112        let partitions = (0..partition_count)
1113            .map(|_| PartitionMetricsEntry::new())
1114            .collect();
1115
1116        Self {
1117            partition_count,
1118            partitions,
1119            partition_events_total,
1120            partition_write_latency,
1121            partition_errors_total,
1122            registry,
1123            started_at: Instant::now(),
1124        }
1125    }
1126
1127    /// Create partition metrics with default partition count (32)
1128    pub fn with_default_partitions() -> Self {
1129        Self::new(32)
1130    }
1131
1132    /// Record a successful write to a partition
1133    ///
1134    /// # Arguments
1135    /// * `partition_id` - The partition ID (0 to partition_count-1)
1136    /// * `latency` - The write latency
1137    #[inline]
1138    pub fn record_write(&self, partition_id: u32, latency: Duration) {
1139        if partition_id >= self.partition_count {
1140            return;
1141        }
1142
1143        let entry = &self.partitions[partition_id as usize];
1144        let latency_ns = latency.as_nanos() as u64;
1145
1146        // Update atomic counters
1147        entry.event_count.fetch_add(1, Ordering::Relaxed);
1148        entry.write_count.fetch_add(1, Ordering::Relaxed);
1149        entry.total_latency_ns.fetch_add(latency_ns, Ordering::Relaxed);
1150
1151        // Update min latency (compare-and-swap loop)
1152        let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1153        while latency_ns < current_min {
1154            match entry.min_latency_ns.compare_exchange_weak(
1155                current_min,
1156                latency_ns,
1157                Ordering::Relaxed,
1158                Ordering::Relaxed,
1159            ) {
1160                Ok(_) => break,
1161                Err(actual) => current_min = actual,
1162            }
1163        }
1164
1165        // Update max latency (compare-and-swap loop)
1166        let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1167        while latency_ns > current_max {
1168            match entry.max_latency_ns.compare_exchange_weak(
1169                current_max,
1170                latency_ns,
1171                Ordering::Relaxed,
1172                Ordering::Relaxed,
1173            ) {
1174                Ok(_) => break,
1175                Err(actual) => current_max = actual,
1176            }
1177        }
1178
1179        // Update Prometheus metrics
1180        let partition_id_str = partition_id.to_string();
1181        self.partition_events_total
1182            .with_label_values(&[&partition_id_str])
1183            .set(entry.event_count.load(Ordering::Relaxed) as i64);
1184        self.partition_write_latency
1185            .with_label_values(&[&partition_id_str])
1186            .observe(latency.as_secs_f64());
1187    }
1188
1189    /// Record an error for a partition
1190    ///
1191    /// # Arguments
1192    /// * `partition_id` - The partition ID (0 to partition_count-1)
1193    #[inline]
1194    pub fn record_error(&self, partition_id: u32) {
1195        if partition_id >= self.partition_count {
1196            return;
1197        }
1198
1199        let entry = &self.partitions[partition_id as usize];
1200        entry.error_count.fetch_add(1, Ordering::Relaxed);
1201
1202        // Update Prometheus metrics
1203        let partition_id_str = partition_id.to_string();
1204        self.partition_errors_total
1205            .with_label_values(&[&partition_id_str])
1206            .inc();
1207    }
1208
1209    /// Record a batch write to a partition
1210    ///
1211    /// # Arguments
1212    /// * `partition_id` - The partition ID (0 to partition_count-1)
1213    /// * `count` - Number of events in the batch
1214    /// * `latency` - Total latency for the batch write
1215    #[inline]
1216    pub fn record_batch_write(&self, partition_id: u32, count: u64, latency: Duration) {
1217        if partition_id >= self.partition_count {
1218            return;
1219        }
1220
1221        let entry = &self.partitions[partition_id as usize];
1222        let latency_ns = latency.as_nanos() as u64;
1223
1224        // Update atomic counters
1225        entry.event_count.fetch_add(count, Ordering::Relaxed);
1226        entry.write_count.fetch_add(1, Ordering::Relaxed);
1227        entry.total_latency_ns.fetch_add(latency_ns, Ordering::Relaxed);
1228
1229        // Update min/max latency using per-event average
1230        let per_event_latency_ns = latency_ns / count.max(1);
1231
1232        // Update min latency
1233        let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1234        while per_event_latency_ns < current_min {
1235            match entry.min_latency_ns.compare_exchange_weak(
1236                current_min,
1237                per_event_latency_ns,
1238                Ordering::Relaxed,
1239                Ordering::Relaxed,
1240            ) {
1241                Ok(_) => break,
1242                Err(actual) => current_min = actual,
1243            }
1244        }
1245
1246        // Update max latency
1247        let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1248        while per_event_latency_ns > current_max {
1249            match entry.max_latency_ns.compare_exchange_weak(
1250                current_max,
1251                per_event_latency_ns,
1252                Ordering::Relaxed,
1253                Ordering::Relaxed,
1254            ) {
1255                Ok(_) => break,
1256                Err(actual) => current_max = actual,
1257            }
1258        }
1259
1260        // Update Prometheus metrics
1261        let partition_id_str = partition_id.to_string();
1262        self.partition_events_total
1263            .with_label_values(&[&partition_id_str])
1264            .set(entry.event_count.load(Ordering::Relaxed) as i64);
1265        self.partition_write_latency
1266            .with_label_values(&[&partition_id_str])
1267            .observe(latency.as_secs_f64());
1268    }
1269
1270    /// Get statistics for a specific partition
1271    pub fn get_partition_stats(&self, partition_id: u32) -> Option<PartitionStats> {
1272        if partition_id >= self.partition_count {
1273            return None;
1274        }
1275
1276        let entry = &self.partitions[partition_id as usize];
1277
1278        Some(PartitionStats {
1279            partition_id,
1280            event_count: entry.event_count.load(Ordering::Relaxed),
1281            total_latency_ns: entry.total_latency_ns.load(Ordering::Relaxed),
1282            write_count: entry.write_count.load(Ordering::Relaxed),
1283            min_latency_ns: entry.min_latency_ns.load(Ordering::Relaxed),
1284            max_latency_ns: entry.max_latency_ns.load(Ordering::Relaxed),
1285            error_count: entry.error_count.load(Ordering::Relaxed),
1286        })
1287    }
1288
1289    /// Get statistics for all partitions
1290    pub fn get_all_partition_stats(&self) -> Vec<PartitionStats> {
1291        (0..self.partition_count)
1292            .filter_map(|id| self.get_partition_stats(id))
1293            .collect()
1294    }
1295
1296    /// Detect partition imbalance (hot partitions)
1297    ///
1298    /// Returns alerts for any partition with >2x average event count.
1299    /// This is the SierraDB pattern for detecting skew and hot partitions.
1300    ///
1301    /// # Returns
1302    /// Vector of alerts for imbalanced partitions
1303    pub fn detect_partition_imbalance(&self) -> Vec<PartitionImbalanceAlert> {
1304        let mut alerts = Vec::new();
1305        let stats = self.get_all_partition_stats();
1306
1307        // Calculate total and average event count
1308        let total_events: u64 = stats.iter().map(|s| s.event_count).sum();
1309        let active_partitions = stats.iter().filter(|s| s.event_count > 0).count();
1310
1311        if active_partitions == 0 {
1312            return alerts;
1313        }
1314
1315        let average_count = total_events as f64 / active_partitions as f64;
1316        let imbalance_threshold = 2.0; // SierraDB threshold: 2x average
1317
1318        for stat in stats {
1319            if stat.event_count == 0 {
1320                continue;
1321            }
1322
1323            let ratio = stat.event_count as f64 / average_count;
1324
1325            if ratio > imbalance_threshold {
1326                alerts.push(PartitionImbalanceAlert {
1327                    partition_id: stat.partition_id,
1328                    event_count: stat.event_count,
1329                    average_count,
1330                    ratio_to_average: ratio,
1331                    message: format!(
1332                        "Partition {} has {:.1}x average load ({} events vs {:.0} avg)",
1333                        stat.partition_id, ratio, stat.event_count, average_count
1334                    ),
1335                    timestamp: chrono::Utc::now(),
1336                });
1337            }
1338        }
1339
1340        alerts
1341    }
1342
1343    /// Get partition count
1344    pub fn partition_count(&self) -> u32 {
1345        self.partition_count
1346    }
1347
1348    /// Get total events across all partitions
1349    pub fn total_events(&self) -> u64 {
1350        self.partitions
1351            .iter()
1352            .map(|e| e.event_count.load(Ordering::Relaxed))
1353            .sum()
1354    }
1355
1356    /// Get total errors across all partitions
1357    pub fn total_errors(&self) -> u64 {
1358        self.partitions
1359            .iter()
1360            .map(|e| e.error_count.load(Ordering::Relaxed))
1361            .sum()
1362    }
1363
1364    /// Get uptime since metrics collection started
1365    pub fn uptime(&self) -> Duration {
1366        self.started_at.elapsed()
1367    }
1368
1369    /// Get the Prometheus registry for this partition metrics
1370    pub fn registry(&self) -> &Registry {
1371        &self.registry
1372    }
1373
1374    /// Encode metrics in Prometheus text format
1375    pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
1376        use prometheus::Encoder;
1377        let encoder = prometheus::TextEncoder::new();
1378        let metric_families = self.registry.gather();
1379        let mut buffer = Vec::new();
1380        encoder.encode(&metric_families, &mut buffer)?;
1381        Ok(String::from_utf8(buffer)?)
1382    }
1383
1384    /// Get partition distribution as a map
1385    pub fn get_distribution(&self) -> HashMap<u32, u64> {
1386        self.partitions
1387            .iter()
1388            .enumerate()
1389            .map(|(id, entry)| (id as u32, entry.event_count.load(Ordering::Relaxed)))
1390            .collect()
1391    }
1392
1393    /// Reset all partition metrics
1394    pub fn reset(&self) {
1395        for entry in &self.partitions {
1396            entry.event_count.store(0, Ordering::Relaxed);
1397            entry.total_latency_ns.store(0, Ordering::Relaxed);
1398            entry.write_count.store(0, Ordering::Relaxed);
1399            entry.min_latency_ns.store(u64::MAX, Ordering::Relaxed);
1400            entry.max_latency_ns.store(0, Ordering::Relaxed);
1401            entry.error_count.store(0, Ordering::Relaxed);
1402        }
1403    }
1404}
1405
1406impl Default for PartitionMetrics {
1407    fn default() -> Self {
1408        Self::with_default_partitions()
1409    }
1410}
1411
1412#[cfg(test)]
1413mod partition_tests {
1414    use super::*;
1415    use std::thread;
1416
1417    #[test]
1418    fn test_partition_metrics_creation() {
1419        let metrics = PartitionMetrics::new(32);
1420        assert_eq!(metrics.partition_count(), 32);
1421        assert_eq!(metrics.total_events(), 0);
1422        assert_eq!(metrics.total_errors(), 0);
1423    }
1424
1425    #[test]
1426    fn test_partition_metrics_default() {
1427        let metrics = PartitionMetrics::default();
1428        assert_eq!(metrics.partition_count(), 32);
1429    }
1430
1431    #[test]
1432    fn test_record_write() {
1433        let metrics = PartitionMetrics::new(32);
1434
1435        metrics.record_write(0, Duration::from_micros(100));
1436        metrics.record_write(0, Duration::from_micros(200));
1437        metrics.record_write(1, Duration::from_micros(150));
1438
1439        let stats0 = metrics.get_partition_stats(0).unwrap();
1440        assert_eq!(stats0.event_count, 2);
1441        assert_eq!(stats0.write_count, 2);
1442
1443        let stats1 = metrics.get_partition_stats(1).unwrap();
1444        assert_eq!(stats1.event_count, 1);
1445    }
1446
1447    #[test]
1448    fn test_record_batch_write() {
1449        let metrics = PartitionMetrics::new(32);
1450
1451        metrics.record_batch_write(5, 100, Duration::from_millis(10));
1452
1453        let stats = metrics.get_partition_stats(5).unwrap();
1454        assert_eq!(stats.event_count, 100);
1455        assert_eq!(stats.write_count, 1);
1456    }
1457
1458    #[test]
1459    fn test_record_error() {
1460        let metrics = PartitionMetrics::new(32);
1461
1462        metrics.record_error(3);
1463        metrics.record_error(3);
1464        metrics.record_error(5);
1465
1466        let stats3 = metrics.get_partition_stats(3).unwrap();
1467        assert_eq!(stats3.error_count, 2);
1468
1469        let stats5 = metrics.get_partition_stats(5).unwrap();
1470        assert_eq!(stats5.error_count, 1);
1471
1472        assert_eq!(metrics.total_errors(), 3);
1473    }
1474
1475    #[test]
1476    fn test_invalid_partition_id() {
1477        let metrics = PartitionMetrics::new(32);
1478
1479        // Should not panic, just ignore invalid partition IDs
1480        metrics.record_write(100, Duration::from_micros(100));
1481        metrics.record_error(100);
1482
1483        assert!(metrics.get_partition_stats(100).is_none());
1484    }
1485
1486    #[test]
1487    fn test_latency_tracking() {
1488        let metrics = PartitionMetrics::new(32);
1489
1490        metrics.record_write(0, Duration::from_micros(100));
1491        metrics.record_write(0, Duration::from_micros(200));
1492        metrics.record_write(0, Duration::from_micros(300));
1493
1494        let stats = metrics.get_partition_stats(0).unwrap();
1495        assert_eq!(stats.min_latency_ns, 100_000); // 100 microseconds in nanoseconds
1496        assert_eq!(stats.max_latency_ns, 300_000); // 300 microseconds in nanoseconds
1497
1498        let avg = stats.avg_latency().unwrap();
1499        assert_eq!(avg, Duration::from_nanos(200_000)); // Average: 200 microseconds
1500    }
1501
1502    #[test]
1503    fn test_detect_partition_imbalance_no_imbalance() {
1504        let metrics = PartitionMetrics::new(4);
1505
1506        // Distribute events evenly
1507        for i in 0..4 {
1508            for _ in 0..100 {
1509                metrics.record_write(i, Duration::from_micros(100));
1510            }
1511        }
1512
1513        let alerts = metrics.detect_partition_imbalance();
1514        assert!(alerts.is_empty(), "No alerts expected for balanced partitions");
1515    }
1516
1517    #[test]
1518    fn test_detect_partition_imbalance_hot_partition() {
1519        let metrics = PartitionMetrics::new(4);
1520
1521        // Partition 0 gets 500 events, others get 100 each
1522        // Average = (500 + 100 + 100 + 100) / 4 = 200
1523        // Partition 0 ratio = 500/200 = 2.5x (>2x threshold)
1524        for _ in 0..500 {
1525            metrics.record_write(0, Duration::from_micros(100));
1526        }
1527        for i in 1..4 {
1528            for _ in 0..100 {
1529                metrics.record_write(i, Duration::from_micros(100));
1530            }
1531        }
1532
1533        let alerts = metrics.detect_partition_imbalance();
1534        assert_eq!(alerts.len(), 1, "Expected one alert for hot partition");
1535        assert_eq!(alerts[0].partition_id, 0);
1536        assert!(alerts[0].ratio_to_average > 2.0);
1537    }
1538
1539    #[test]
1540    fn test_detect_partition_imbalance_empty() {
1541        let metrics = PartitionMetrics::new(4);
1542
1543        let alerts = metrics.detect_partition_imbalance();
1544        assert!(alerts.is_empty(), "No alerts expected for empty metrics");
1545    }
1546
1547    #[test]
1548    fn test_get_all_partition_stats() {
1549        let metrics = PartitionMetrics::new(4);
1550
1551        metrics.record_write(0, Duration::from_micros(100));
1552        metrics.record_write(2, Duration::from_micros(200));
1553
1554        let all_stats = metrics.get_all_partition_stats();
1555        assert_eq!(all_stats.len(), 4);
1556        assert_eq!(all_stats[0].event_count, 1);
1557        assert_eq!(all_stats[1].event_count, 0);
1558        assert_eq!(all_stats[2].event_count, 1);
1559        assert_eq!(all_stats[3].event_count, 0);
1560    }
1561
1562    #[test]
1563    fn test_prometheus_encoding() {
1564        let metrics = PartitionMetrics::new(4);
1565
1566        metrics.record_write(0, Duration::from_micros(100));
1567        metrics.record_write(1, Duration::from_micros(200));
1568        metrics.record_error(0);
1569
1570        let encoded = metrics.encode().unwrap();
1571
1572        assert!(encoded.contains("allsource_partition_events_total"));
1573        assert!(encoded.contains("allsource_partition_write_latency"));
1574        assert!(encoded.contains("allsource_partition_errors_total"));
1575    }
1576
1577    #[test]
1578    fn test_reset() {
1579        let metrics = PartitionMetrics::new(4);
1580
1581        metrics.record_write(0, Duration::from_micros(100));
1582        metrics.record_error(1);
1583
1584        assert_eq!(metrics.total_events(), 1);
1585        assert_eq!(metrics.total_errors(), 1);
1586
1587        metrics.reset();
1588
1589        assert_eq!(metrics.total_events(), 0);
1590        assert_eq!(metrics.total_errors(), 0);
1591    }
1592
1593    #[test]
1594    fn test_concurrent_writes() {
1595        let metrics = Arc::new(PartitionMetrics::new(32));
1596        let mut handles = vec![];
1597
1598        // Spawn 8 threads, each writing 1000 events to random partitions
1599        for _ in 0..8 {
1600            let metrics_clone = metrics.clone();
1601            let handle = thread::spawn(move || {
1602                for i in 0..1000 {
1603                    let partition_id = (i % 32) as u32;
1604                    metrics_clone.record_write(partition_id, Duration::from_micros(100));
1605                }
1606            });
1607            handles.push(handle);
1608        }
1609
1610        for handle in handles {
1611            handle.join().unwrap();
1612        }
1613
1614        assert_eq!(metrics.total_events(), 8000);
1615    }
1616
1617    #[test]
1618    fn test_get_distribution() {
1619        let metrics = PartitionMetrics::new(4);
1620
1621        metrics.record_write(0, Duration::from_micros(100));
1622        metrics.record_write(0, Duration::from_micros(100));
1623        metrics.record_write(2, Duration::from_micros(100));
1624
1625        let distribution = metrics.get_distribution();
1626
1627        assert_eq!(distribution.get(&0), Some(&2));
1628        assert_eq!(distribution.get(&1), Some(&0));
1629        assert_eq!(distribution.get(&2), Some(&1));
1630        assert_eq!(distribution.get(&3), Some(&0));
1631    }
1632
1633    #[test]
1634    fn test_partition_stats_avg_latency_none() {
1635        let stats = PartitionStats::new(0);
1636        assert!(stats.avg_latency().is_none());
1637    }
1638
1639    #[test]
1640    fn test_alert_message_format() {
1641        let metrics = PartitionMetrics::new(4);
1642
1643        // Create imbalanced scenario
1644        for _ in 0..1000 {
1645            metrics.record_write(0, Duration::from_micros(100));
1646        }
1647        for i in 1..4 {
1648            for _ in 0..100 {
1649                metrics.record_write(i, Duration::from_micros(100));
1650            }
1651        }
1652
1653        let alerts = metrics.detect_partition_imbalance();
1654        assert!(!alerts.is_empty());
1655
1656        let alert = &alerts[0];
1657        assert!(alert.message.contains("Partition 0"));
1658        assert!(alert.message.contains("average load"));
1659    }
1660
1661    #[test]
1662    fn test_uptime() {
1663        let metrics = PartitionMetrics::new(4);
1664
1665        // Sleep a bit to ensure non-zero uptime
1666        thread::sleep(Duration::from_millis(10));
1667
1668        let uptime = metrics.uptime();
1669        assert!(uptime.as_millis() >= 10);
1670    }
1671}