Skip to main content

allsource_core/infrastructure/observability/
metrics.rs

1use prometheus::{
2    Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts,
3    Registry,
4};
5use serde::Serialize;
6use std::{
7    collections::HashMap,
8    sync::{
9        Arc,
10        atomic::{AtomicU64, Ordering},
11    },
12    time::{Duration, Instant},
13};
14
15/// Centralized metrics registry for AllSource
16pub struct MetricsRegistry {
17    /// Prometheus registry
18    registry: Registry,
19
20    // Event ingestion metrics
21    pub events_ingested_total: IntCounter,
22    pub events_ingested_by_type: IntCounterVec,
23    pub ingestion_duration_seconds: Histogram,
24    pub ingestion_errors_total: IntCounter,
25
26    // Query metrics
27    pub queries_total: IntCounterVec,
28    pub query_duration_seconds: HistogramVec,
29    pub query_results_total: IntCounterVec,
30
31    // Storage metrics
32    pub storage_events_total: IntGauge,
33    pub storage_entities_total: IntGauge,
34    pub storage_size_bytes: IntGauge,
35    pub parquet_files_total: IntGauge,
36    pub wal_segments_total: IntGauge,
37
38    // Projection metrics
39    pub projections_total: IntGauge,
40    pub projection_events_processed: IntCounterVec,
41    pub projection_errors_total: IntCounterVec,
42    pub projection_processing_duration: HistogramVec,
43    pub projection_duration_seconds: Histogram,
44
45    // Schema registry metrics (v0.5)
46    pub schemas_registered_total: IntCounter,
47    pub schema_validations_total: IntCounterVec,
48    pub schema_validation_duration: Histogram,
49
50    // Replay metrics (v0.5)
51    pub replays_started_total: IntCounter,
52    pub replays_completed_total: IntCounter,
53    pub replays_failed_total: IntCounter,
54    pub replay_events_processed: IntCounter,
55    pub replay_duration_seconds: Histogram,
56
57    // Pipeline metrics (v0.5)
58    pub pipelines_registered_total: IntGauge,
59    pub pipeline_events_processed: IntCounterVec,
60    pub pipeline_events_filtered: IntCounterVec,
61    pub pipeline_errors_total: IntCounterVec,
62    pub pipeline_processing_duration: HistogramVec,
63    pub pipeline_duration_seconds: Histogram,
64
65    // Snapshot metrics
66    pub snapshots_created_total: IntCounter,
67    pub snapshot_creation_duration: Histogram,
68    pub snapshots_total: IntGauge,
69
70    // Compaction metrics
71    pub compactions_total: IntCounter,
72    pub compaction_duration_seconds: Histogram,
73    pub compaction_files_merged: IntCounter,
74    pub compaction_bytes_saved: IntCounter,
75
76    // WebSocket metrics
77    pub websocket_connections_active: IntGauge,
78    pub websocket_connections_total: IntCounter,
79    pub websocket_messages_sent: IntCounter,
80    pub websocket_errors_total: IntCounter,
81
82    // System metrics
83    pub http_requests_total: IntCounterVec,
84    pub http_request_duration_seconds: HistogramVec,
85    pub http_requests_in_flight: IntGauge,
86
87    // Replication metrics (leader)
88    pub replication_followers_connected: IntGauge,
89    pub replication_wal_shipped_total: IntCounter,
90    pub replication_wal_shipped_bytes_total: IntCounter,
91    pub replication_follower_lag_seconds: IntGaugeVec,
92    pub replication_acks_total: IntCounter,
93
94    // Replication ACK wait metric (semi-sync/sync mode)
95    pub replication_ack_wait_seconds: Histogram,
96
97    // Replication metrics (follower)
98    pub replication_wal_received_total: IntCounter,
99    pub replication_wal_replayed_total: IntCounter,
100    pub replication_lag_seconds: IntGauge,
101    pub replication_connected: IntGauge,
102    pub replication_reconnects_total: IntCounter,
103
104    // Cache metrics (Step 3 of the sustainable data strategy).
105    // The cache is the in-memory representation of lazy-loaded
106    // tenant data; both metrics are scoped to that side of the
107    // system. `parquet_files_total` and `storage_size_bytes`
108    // already cover disk-side state.
109    pub cache_evictions_total: IntCounter,
110    pub cache_bytes: IntGauge,
111
112    // WAL replay metric (Step 6 of the sustainable data strategy).
113    // Set on each successful WAL recovery; reflects the size of the
114    // last replay. After a checkpoint truncates the WAL, the next
115    // recovery should report a value bounded by the checkpoint
116    // interval — that's the metric to graph to verify Step 6's
117    // bounded-replay invariant.
118    pub wal_replay_events_total: IntGauge,
119}
120
121impl MetricsRegistry {
122    pub fn new() -> Arc<Self> {
123        let registry = Registry::new();
124
125        // Event ingestion metrics
126        let events_ingested_total = IntCounter::with_opts(Opts::new(
127            "allsource_events_ingested_total",
128            "Total number of events ingested",
129        ))
130        .unwrap();
131
132        let events_ingested_by_type = IntCounterVec::new(
133            Opts::new(
134                "allsource_events_ingested_by_type",
135                "Events ingested by type",
136            ),
137            &["event_type"],
138        )
139        .unwrap();
140
141        let ingestion_duration_seconds = Histogram::with_opts(HistogramOpts::new(
142            "allsource_ingestion_duration_seconds",
143            "Event ingestion duration in seconds",
144        ))
145        .unwrap();
146
147        let ingestion_errors_total = IntCounter::with_opts(Opts::new(
148            "allsource_ingestion_errors_total",
149            "Total number of ingestion errors",
150        ))
151        .unwrap();
152
153        // Query metrics
154        let queries_total = IntCounterVec::new(
155            Opts::new("allsource_queries_total", "Total number of queries"),
156            &["query_type"],
157        )
158        .unwrap();
159
160        let query_duration_seconds = HistogramVec::new(
161            HistogramOpts::new(
162                "allsource_query_duration_seconds",
163                "Query duration in seconds",
164            ),
165            &["query_type"],
166        )
167        .unwrap();
168
169        let query_results_total = IntCounterVec::new(
170            Opts::new(
171                "allsource_query_results_total",
172                "Total number of events returned by queries",
173            ),
174            &["query_type"],
175        )
176        .unwrap();
177
178        // Storage metrics
179        let storage_events_total = IntGauge::with_opts(Opts::new(
180            "allsource_storage_events_total",
181            "Total number of events in storage",
182        ))
183        .unwrap();
184
185        let storage_entities_total = IntGauge::with_opts(Opts::new(
186            "allsource_storage_entities_total",
187            "Total number of entities in storage",
188        ))
189        .unwrap();
190
191        let storage_size_bytes = IntGauge::with_opts(Opts::new(
192            "allsource_storage_size_bytes",
193            "Total storage size in bytes",
194        ))
195        .unwrap();
196
197        let parquet_files_total = IntGauge::with_opts(Opts::new(
198            "allsource_parquet_files_total",
199            "Number of Parquet files",
200        ))
201        .unwrap();
202
203        let wal_segments_total = IntGauge::with_opts(Opts::new(
204            "allsource_wal_segments_total",
205            "Number of WAL segments",
206        ))
207        .unwrap();
208
209        // Projection metrics
210        let projection_events_processed = IntCounterVec::new(
211            Opts::new(
212                "allsource_projection_events_processed",
213                "Events processed by projections",
214            ),
215            &["projection_name"],
216        )
217        .unwrap();
218
219        let projection_errors_total = IntCounterVec::new(
220            Opts::new(
221                "allsource_projection_errors_total",
222                "Total projection errors",
223            ),
224            &["projection_name"],
225        )
226        .unwrap();
227
228        let projection_processing_duration = HistogramVec::new(
229            HistogramOpts::new(
230                "allsource_projection_processing_duration_seconds",
231                "Projection processing duration",
232            ),
233            &["projection_name"],
234        )
235        .unwrap();
236
237        let projections_total = IntGauge::with_opts(Opts::new(
238            "allsource_projections_total",
239            "Number of registered projections",
240        ))
241        .unwrap();
242
243        let projection_duration_seconds = Histogram::with_opts(HistogramOpts::new(
244            "allsource_projection_duration_seconds",
245            "Overall projection manager processing duration",
246        ))
247        .unwrap();
248
249        // Schema registry metrics (v0.5)
250        let schemas_registered_total = IntCounter::with_opts(Opts::new(
251            "allsource_schemas_registered_total",
252            "Total number of schemas registered",
253        ))
254        .unwrap();
255
256        let schema_validations_total = IntCounterVec::new(
257            Opts::new(
258                "allsource_schema_validations_total",
259                "Schema validations by result",
260            ),
261            &["subject", "result"],
262        )
263        .unwrap();
264
265        let schema_validation_duration = Histogram::with_opts(HistogramOpts::new(
266            "allsource_schema_validation_duration_seconds",
267            "Schema validation duration",
268        ))
269        .unwrap();
270
271        // Replay metrics (v0.5)
272        let replays_started_total = IntCounter::with_opts(Opts::new(
273            "allsource_replays_started_total",
274            "Total replays started",
275        ))
276        .unwrap();
277
278        let replays_completed_total = IntCounter::with_opts(Opts::new(
279            "allsource_replays_completed_total",
280            "Total replays completed",
281        ))
282        .unwrap();
283
284        let replays_failed_total = IntCounter::with_opts(Opts::new(
285            "allsource_replays_failed_total",
286            "Total replays failed",
287        ))
288        .unwrap();
289
290        let replay_events_processed = IntCounter::with_opts(Opts::new(
291            "allsource_replay_events_processed",
292            "Events processed during replays",
293        ))
294        .unwrap();
295
296        let replay_duration_seconds = Histogram::with_opts(HistogramOpts::new(
297            "allsource_replay_duration_seconds",
298            "Replay duration",
299        ))
300        .unwrap();
301
302        // Pipeline metrics (v0.5)
303        let pipelines_registered_total = IntGauge::with_opts(Opts::new(
304            "allsource_pipelines_registered_total",
305            "Number of registered pipelines",
306        ))
307        .unwrap();
308
309        let pipeline_events_processed = IntCounterVec::new(
310            Opts::new(
311                "allsource_pipeline_events_processed",
312                "Events processed by pipelines",
313            ),
314            &["pipeline_id", "pipeline_name"],
315        )
316        .unwrap();
317
318        let pipeline_events_filtered = IntCounterVec::new(
319            Opts::new(
320                "allsource_pipeline_events_filtered",
321                "Events filtered by pipelines",
322            ),
323            &["pipeline_id", "pipeline_name"],
324        )
325        .unwrap();
326
327        let pipeline_processing_duration = HistogramVec::new(
328            HistogramOpts::new(
329                "allsource_pipeline_processing_duration_seconds",
330                "Pipeline processing duration",
331            ),
332            &["pipeline_id", "pipeline_name"],
333        )
334        .unwrap();
335
336        let pipeline_errors_total = IntCounterVec::new(
337            Opts::new("allsource_pipeline_errors_total", "Total pipeline errors"),
338            &["pipeline_name"],
339        )
340        .unwrap();
341
342        let pipeline_duration_seconds = Histogram::with_opts(HistogramOpts::new(
343            "allsource_pipeline_duration_seconds",
344            "Overall pipeline manager processing duration",
345        ))
346        .unwrap();
347
348        // Snapshot metrics
349        let snapshots_created_total = IntCounter::with_opts(Opts::new(
350            "allsource_snapshots_created_total",
351            "Total snapshots created",
352        ))
353        .unwrap();
354
355        let snapshot_creation_duration = Histogram::with_opts(HistogramOpts::new(
356            "allsource_snapshot_creation_duration_seconds",
357            "Snapshot creation duration",
358        ))
359        .unwrap();
360
361        let snapshots_total = IntGauge::with_opts(Opts::new(
362            "allsource_snapshots_total",
363            "Total number of snapshots",
364        ))
365        .unwrap();
366
367        // Compaction metrics
368        let compactions_total = IntCounter::with_opts(Opts::new(
369            "allsource_compactions_total",
370            "Total compactions performed",
371        ))
372        .unwrap();
373
374        let compaction_duration_seconds = Histogram::with_opts(HistogramOpts::new(
375            "allsource_compaction_duration_seconds",
376            "Compaction duration",
377        ))
378        .unwrap();
379
380        let compaction_files_merged = IntCounter::with_opts(Opts::new(
381            "allsource_compaction_files_merged",
382            "Files merged during compaction",
383        ))
384        .unwrap();
385
386        let compaction_bytes_saved = IntCounter::with_opts(Opts::new(
387            "allsource_compaction_bytes_saved",
388            "Bytes saved by compaction",
389        ))
390        .unwrap();
391
392        // Cache metrics (Step 3): per-tenant in-memory eviction
393        let cache_evictions_total = IntCounter::with_opts(Opts::new(
394            "allsource_core_cache_evictions_total",
395            "Total tenant evictions from the in-memory cache",
396        ))
397        .unwrap();
398
399        let cache_bytes = IntGauge::with_opts(Opts::new(
400            "allsource_core_cache_bytes",
401            "Approximate resident bytes in the in-memory tenant cache",
402        ))
403        .unwrap();
404
405        // WAL replay metric (Step 6): size of last recovery.
406        let wal_replay_events_total = IntGauge::with_opts(Opts::new(
407            "allsource_core_wal_replay_events_total",
408            "Number of events replayed from the WAL on the most recent boot",
409        ))
410        .unwrap();
411
412        // WebSocket metrics
413        let websocket_connections_active = IntGauge::with_opts(Opts::new(
414            "allsource_websocket_connections_active",
415            "Active WebSocket connections",
416        ))
417        .unwrap();
418
419        let websocket_connections_total = IntCounter::with_opts(Opts::new(
420            "allsource_websocket_connections_total",
421            "Total WebSocket connections",
422        ))
423        .unwrap();
424
425        let websocket_messages_sent = IntCounter::with_opts(Opts::new(
426            "allsource_websocket_messages_sent",
427            "WebSocket messages sent",
428        ))
429        .unwrap();
430
431        let websocket_errors_total = IntCounter::with_opts(Opts::new(
432            "allsource_websocket_errors_total",
433            "WebSocket errors",
434        ))
435        .unwrap();
436
437        // System metrics
438        let http_requests_total = IntCounterVec::new(
439            Opts::new("allsource_http_requests_total", "Total HTTP requests"),
440            &["method", "endpoint", "status"],
441        )
442        .unwrap();
443
444        let http_request_duration_seconds = HistogramVec::new(
445            HistogramOpts::new(
446                "allsource_http_request_duration_seconds",
447                "HTTP request duration",
448            ),
449            &["method", "endpoint"],
450        )
451        .unwrap();
452
453        let http_requests_in_flight = IntGauge::with_opts(Opts::new(
454            "allsource_http_requests_in_flight",
455            "HTTP requests currently being processed",
456        ))
457        .unwrap();
458
459        // Replication metrics (leader)
460        let replication_followers_connected = IntGauge::with_opts(Opts::new(
461            "allsource_replication_followers_connected",
462            "Number of connected followers",
463        ))
464        .unwrap();
465
466        let replication_wal_shipped_total = IntCounter::with_opts(Opts::new(
467            "allsource_replication_wal_shipped_total",
468            "Total WAL entries shipped to followers",
469        ))
470        .unwrap();
471
472        let replication_wal_shipped_bytes_total = IntCounter::with_opts(Opts::new(
473            "allsource_replication_wal_shipped_bytes_total",
474            "Total bytes shipped to followers",
475        ))
476        .unwrap();
477
478        let replication_follower_lag_seconds = IntGaugeVec::new(
479            Opts::new(
480                "allsource_replication_follower_lag_seconds",
481                "Per-follower replication lag in seconds",
482            ),
483            &["follower_id"],
484        )
485        .unwrap();
486
487        let replication_acks_total = IntCounter::with_opts(Opts::new(
488            "allsource_replication_acks_total",
489            "Total ACKs received from followers",
490        ))
491        .unwrap();
492
493        let replication_ack_wait_seconds = Histogram::with_opts(
494            HistogramOpts::new(
495                "allsource_replication_ack_wait_seconds",
496                "Time spent waiting for follower ACKs in semi-sync/sync mode",
497            )
498            .buckets(vec![
499                0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
500            ]),
501        )
502        .unwrap();
503
504        // Replication metrics (follower)
505        let replication_wal_received_total = IntCounter::with_opts(Opts::new(
506            "allsource_replication_wal_received_total",
507            "Total WAL entries received from leader",
508        ))
509        .unwrap();
510
511        let replication_wal_replayed_total = IntCounter::with_opts(Opts::new(
512            "allsource_replication_wal_replayed_total",
513            "Total WAL entries replayed into DashMap",
514        ))
515        .unwrap();
516
517        let replication_lag_seconds = IntGauge::with_opts(Opts::new(
518            "allsource_replication_lag_seconds",
519            "Replication lag behind leader in seconds",
520        ))
521        .unwrap();
522
523        let replication_connected = IntGauge::with_opts(Opts::new(
524            "allsource_replication_connected",
525            "Whether connected to leader (1=connected, 0=disconnected)",
526        ))
527        .unwrap();
528
529        let replication_reconnects_total = IntCounter::with_opts(Opts::new(
530            "allsource_replication_reconnects_total",
531            "Total reconnection attempts to leader",
532        ))
533        .unwrap();
534
535        // Register all metrics
536        registry
537            .register(Box::new(events_ingested_total.clone()))
538            .unwrap();
539        registry
540            .register(Box::new(events_ingested_by_type.clone()))
541            .unwrap();
542        registry
543            .register(Box::new(ingestion_duration_seconds.clone()))
544            .unwrap();
545        registry
546            .register(Box::new(ingestion_errors_total.clone()))
547            .unwrap();
548
549        registry.register(Box::new(queries_total.clone())).unwrap();
550        registry
551            .register(Box::new(query_duration_seconds.clone()))
552            .unwrap();
553        registry
554            .register(Box::new(query_results_total.clone()))
555            .unwrap();
556
557        registry
558            .register(Box::new(storage_events_total.clone()))
559            .unwrap();
560        registry
561            .register(Box::new(storage_entities_total.clone()))
562            .unwrap();
563        registry
564            .register(Box::new(storage_size_bytes.clone()))
565            .unwrap();
566        registry
567            .register(Box::new(parquet_files_total.clone()))
568            .unwrap();
569        registry
570            .register(Box::new(wal_segments_total.clone()))
571            .unwrap();
572
573        registry
574            .register(Box::new(projection_events_processed.clone()))
575            .unwrap();
576        registry
577            .register(Box::new(projection_errors_total.clone()))
578            .unwrap();
579        registry
580            .register(Box::new(projection_processing_duration.clone()))
581            .unwrap();
582        registry
583            .register(Box::new(projections_total.clone()))
584            .unwrap();
585        registry
586            .register(Box::new(projection_duration_seconds.clone()))
587            .unwrap();
588
589        registry
590            .register(Box::new(schemas_registered_total.clone()))
591            .unwrap();
592        registry
593            .register(Box::new(schema_validations_total.clone()))
594            .unwrap();
595        registry
596            .register(Box::new(schema_validation_duration.clone()))
597            .unwrap();
598
599        registry
600            .register(Box::new(replays_started_total.clone()))
601            .unwrap();
602        registry
603            .register(Box::new(replays_completed_total.clone()))
604            .unwrap();
605        registry
606            .register(Box::new(replays_failed_total.clone()))
607            .unwrap();
608        registry
609            .register(Box::new(replay_events_processed.clone()))
610            .unwrap();
611        registry
612            .register(Box::new(replay_duration_seconds.clone()))
613            .unwrap();
614
615        registry
616            .register(Box::new(pipelines_registered_total.clone()))
617            .unwrap();
618        registry
619            .register(Box::new(pipeline_events_processed.clone()))
620            .unwrap();
621        registry
622            .register(Box::new(pipeline_events_filtered.clone()))
623            .unwrap();
624        registry
625            .register(Box::new(pipeline_processing_duration.clone()))
626            .unwrap();
627        registry
628            .register(Box::new(pipeline_errors_total.clone()))
629            .unwrap();
630        registry
631            .register(Box::new(pipeline_duration_seconds.clone()))
632            .unwrap();
633
634        registry
635            .register(Box::new(snapshots_created_total.clone()))
636            .unwrap();
637        registry
638            .register(Box::new(snapshot_creation_duration.clone()))
639            .unwrap();
640        registry
641            .register(Box::new(snapshots_total.clone()))
642            .unwrap();
643
644        registry
645            .register(Box::new(compactions_total.clone()))
646            .unwrap();
647        registry
648            .register(Box::new(compaction_duration_seconds.clone()))
649            .unwrap();
650        registry
651            .register(Box::new(compaction_files_merged.clone()))
652            .unwrap();
653        registry
654            .register(Box::new(compaction_bytes_saved.clone()))
655            .unwrap();
656
657        registry
658            .register(Box::new(cache_evictions_total.clone()))
659            .unwrap();
660        registry.register(Box::new(cache_bytes.clone())).unwrap();
661        registry
662            .register(Box::new(wal_replay_events_total.clone()))
663            .unwrap();
664
665        registry
666            .register(Box::new(websocket_connections_active.clone()))
667            .unwrap();
668        registry
669            .register(Box::new(websocket_connections_total.clone()))
670            .unwrap();
671        registry
672            .register(Box::new(websocket_messages_sent.clone()))
673            .unwrap();
674        registry
675            .register(Box::new(websocket_errors_total.clone()))
676            .unwrap();
677
678        registry
679            .register(Box::new(http_requests_total.clone()))
680            .unwrap();
681        registry
682            .register(Box::new(http_request_duration_seconds.clone()))
683            .unwrap();
684        registry
685            .register(Box::new(http_requests_in_flight.clone()))
686            .unwrap();
687
688        registry
689            .register(Box::new(replication_followers_connected.clone()))
690            .unwrap();
691        registry
692            .register(Box::new(replication_wal_shipped_total.clone()))
693            .unwrap();
694        registry
695            .register(Box::new(replication_wal_shipped_bytes_total.clone()))
696            .unwrap();
697        registry
698            .register(Box::new(replication_follower_lag_seconds.clone()))
699            .unwrap();
700        registry
701            .register(Box::new(replication_acks_total.clone()))
702            .unwrap();
703        registry
704            .register(Box::new(replication_ack_wait_seconds.clone()))
705            .unwrap();
706        registry
707            .register(Box::new(replication_wal_received_total.clone()))
708            .unwrap();
709        registry
710            .register(Box::new(replication_wal_replayed_total.clone()))
711            .unwrap();
712        registry
713            .register(Box::new(replication_lag_seconds.clone()))
714            .unwrap();
715        registry
716            .register(Box::new(replication_connected.clone()))
717            .unwrap();
718        registry
719            .register(Box::new(replication_reconnects_total.clone()))
720            .unwrap();
721
722        Arc::new(Self {
723            registry,
724            events_ingested_total,
725            events_ingested_by_type,
726            ingestion_duration_seconds,
727            ingestion_errors_total,
728            queries_total,
729            query_duration_seconds,
730            query_results_total,
731            storage_events_total,
732            storage_entities_total,
733            storage_size_bytes,
734            parquet_files_total,
735            wal_segments_total,
736            projections_total,
737            projection_events_processed,
738            projection_errors_total,
739            projection_processing_duration,
740            projection_duration_seconds,
741            schemas_registered_total,
742            schema_validations_total,
743            schema_validation_duration,
744            replays_started_total,
745            replays_completed_total,
746            replays_failed_total,
747            replay_events_processed,
748            replay_duration_seconds,
749            pipelines_registered_total,
750            pipeline_events_processed,
751            pipeline_events_filtered,
752            pipeline_errors_total,
753            pipeline_processing_duration,
754            pipeline_duration_seconds,
755            snapshots_created_total,
756            snapshot_creation_duration,
757            snapshots_total,
758            compactions_total,
759            compaction_duration_seconds,
760            compaction_files_merged,
761            compaction_bytes_saved,
762            websocket_connections_active,
763            websocket_connections_total,
764            websocket_messages_sent,
765            websocket_errors_total,
766            http_requests_total,
767            http_request_duration_seconds,
768            http_requests_in_flight,
769            replication_followers_connected,
770            replication_wal_shipped_total,
771            replication_wal_shipped_bytes_total,
772            replication_follower_lag_seconds,
773            replication_acks_total,
774            replication_ack_wait_seconds,
775            replication_wal_received_total,
776            replication_wal_replayed_total,
777            replication_lag_seconds,
778            replication_connected,
779            replication_reconnects_total,
780            cache_evictions_total,
781            cache_bytes,
782            wal_replay_events_total,
783        })
784    }
785
786    /// Get the Prometheus registry
787    pub fn registry(&self) -> &Registry {
788        &self.registry
789    }
790
791    /// Encode metrics in Prometheus text format
792    pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
793        use prometheus::Encoder;
794        let encoder = prometheus::TextEncoder::new();
795        let metric_families = self.registry.gather();
796        let mut buffer = Vec::new();
797        encoder.encode(&metric_families, &mut buffer)?;
798        Ok(String::from_utf8(buffer)?)
799    }
800}
801
802// Note: Clone and Default are intentionally NOT implemented for MetricsRegistry.
803// Use Arc<MetricsRegistry> to share the same registry across the application.
804// Creating multiple registries would result in duplicate metrics which is incorrect.
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809
810    #[test]
811    fn test_metrics_registry_creation() {
812        let metrics = MetricsRegistry::new();
813        assert_eq!(metrics.events_ingested_total.get(), 0);
814        assert_eq!(metrics.storage_events_total.get(), 0);
815    }
816
817    #[test]
818    fn test_event_ingestion_metrics() {
819        let metrics = MetricsRegistry::new();
820
821        // Increment ingestion counter
822        metrics.events_ingested_total.inc();
823        assert_eq!(metrics.events_ingested_total.get(), 1);
824
825        // Increment by type
826        metrics
827            .events_ingested_by_type
828            .with_label_values(&["user.created"])
829            .inc();
830        assert_eq!(
831            metrics
832                .events_ingested_by_type
833                .with_label_values(&["user.created"])
834                .get(),
835            1
836        );
837
838        // Record duration
839        metrics.ingestion_duration_seconds.observe(0.1);
840    }
841
842    #[test]
843    fn test_query_metrics() {
844        let metrics = MetricsRegistry::new();
845
846        // Increment query counter
847        metrics
848            .queries_total
849            .with_label_values(&["entity_id"])
850            .inc();
851        assert_eq!(
852            metrics
853                .queries_total
854                .with_label_values(&["entity_id"])
855                .get(),
856            1
857        );
858
859        // Record query duration
860        metrics
861            .query_duration_seconds
862            .with_label_values(&["entity_id"])
863            .observe(0.05);
864
865        // Record query results
866        metrics
867            .query_results_total
868            .with_label_values(&["entity_id"])
869            .inc_by(10);
870    }
871
872    #[test]
873    fn test_storage_metrics() {
874        let metrics = MetricsRegistry::new();
875
876        // Set storage metrics
877        metrics.storage_events_total.set(1000);
878        assert_eq!(metrics.storage_events_total.get(), 1000);
879
880        metrics.storage_entities_total.set(50);
881        assert_eq!(metrics.storage_entities_total.get(), 50);
882
883        metrics.storage_size_bytes.set(1024 * 1024);
884        assert_eq!(metrics.storage_size_bytes.get(), 1024 * 1024);
885
886        metrics.parquet_files_total.set(5);
887        metrics.wal_segments_total.set(3);
888    }
889
890    #[test]
891    fn test_projection_metrics() {
892        let metrics = MetricsRegistry::new();
893
894        // Set projections total
895        metrics.projections_total.set(3);
896        assert_eq!(metrics.projections_total.get(), 3);
897
898        // Process events in projection
899        metrics
900            .projection_events_processed
901            .with_label_values(&["user_snapshot"])
902            .inc_by(100);
903
904        // Record processing duration
905        metrics
906            .projection_processing_duration
907            .with_label_values(&["user_snapshot"])
908            .observe(0.2);
909
910        // Record errors
911        metrics
912            .projection_errors_total
913            .with_label_values(&["user_snapshot"])
914            .inc();
915    }
916
917    #[test]
918    fn test_schema_metrics() {
919        let metrics = MetricsRegistry::new();
920
921        // Register schema
922        metrics.schemas_registered_total.inc();
923        assert_eq!(metrics.schemas_registered_total.get(), 1);
924
925        // Validation success - requires both subject and result labels
926        metrics
927            .schema_validations_total
928            .with_label_values(&["user.schema", "success"])
929            .inc();
930
931        // Validation failure
932        metrics
933            .schema_validations_total
934            .with_label_values(&["order.schema", "failure"])
935            .inc();
936
937        // Record validation duration
938        metrics.schema_validation_duration.observe(0.01);
939    }
940
941    #[test]
942    fn test_replay_metrics() {
943        let metrics = MetricsRegistry::new();
944
945        // Start replay
946        metrics.replays_started_total.inc();
947        assert_eq!(metrics.replays_started_total.get(), 1);
948
949        // Process events
950        metrics.replay_events_processed.inc_by(500);
951        assert_eq!(metrics.replay_events_processed.get(), 500);
952
953        // Complete replay
954        metrics.replays_completed_total.inc();
955        assert_eq!(metrics.replays_completed_total.get(), 1);
956
957        // Record duration
958        metrics.replay_duration_seconds.observe(5.5);
959    }
960
961    #[test]
962    fn test_pipeline_metrics() {
963        let metrics = MetricsRegistry::new();
964
965        // Register pipeline
966        metrics.pipelines_registered_total.set(2);
967        assert_eq!(metrics.pipelines_registered_total.get(), 2);
968
969        // Process events - requires both pipeline_id and pipeline_name labels
970        metrics
971            .pipeline_events_processed
972            .with_label_values(&["pipeline-1", "filter_pipeline"])
973            .inc_by(250);
974
975        // Record errors - only requires pipeline_name
976        metrics
977            .pipeline_errors_total
978            .with_label_values(&["filter_pipeline"])
979            .inc();
980
981        // Record duration - requires both pipeline_id and pipeline_name labels
982        metrics
983            .pipeline_processing_duration
984            .with_label_values(&["pipeline-1", "filter_pipeline"])
985            .observe(0.15);
986    }
987
988    #[test]
989    fn test_metrics_encode() {
990        let metrics = MetricsRegistry::new();
991
992        // Add some data
993        metrics.events_ingested_total.inc_by(100);
994        metrics.storage_events_total.set(1000);
995
996        // Encode to Prometheus format
997        let encoded = metrics.encode().unwrap();
998
999        // Verify output contains metrics
1000        assert!(encoded.contains("events_ingested_total"));
1001        assert!(encoded.contains("storage_events_total"));
1002    }
1003
1004    #[test]
1005    fn test_metrics_default() {
1006        let metrics = MetricsRegistry::new();
1007        assert_eq!(metrics.events_ingested_total.get(), 0);
1008    }
1009
1010    #[test]
1011    fn test_websocket_metrics() {
1012        let metrics = MetricsRegistry::new();
1013
1014        // Connect client
1015        metrics.websocket_connections_active.inc();
1016        assert_eq!(metrics.websocket_connections_active.get(), 1);
1017
1018        // Total connections
1019        metrics.websocket_connections_total.inc();
1020
1021        // Broadcast message
1022        metrics.websocket_messages_sent.inc_by(10);
1023        assert_eq!(metrics.websocket_messages_sent.get(), 10);
1024
1025        // Disconnect client
1026        metrics.websocket_connections_active.dec();
1027        assert_eq!(metrics.websocket_connections_active.get(), 0);
1028
1029        // Record error
1030        metrics.websocket_errors_total.inc();
1031    }
1032
1033    #[test]
1034    fn test_compaction_metrics() {
1035        let metrics = MetricsRegistry::new();
1036
1037        // Start compaction
1038        metrics.compactions_total.inc();
1039        assert_eq!(metrics.compactions_total.get(), 1);
1040
1041        // Record duration
1042        metrics.compaction_duration_seconds.observe(5.2);
1043
1044        // Files merged
1045        metrics.compaction_files_merged.inc_by(5);
1046
1047        // Bytes saved
1048        metrics.compaction_bytes_saved.inc_by(1024 * 1024);
1049    }
1050
1051    #[test]
1052    fn test_snapshot_metrics() {
1053        let metrics = MetricsRegistry::new();
1054
1055        // Create snapshot
1056        metrics.snapshots_created_total.inc();
1057        assert_eq!(metrics.snapshots_created_total.get(), 1);
1058
1059        // Record duration
1060        metrics.snapshot_creation_duration.observe(0.5);
1061
1062        // Total snapshots
1063        metrics.snapshots_total.set(10);
1064        assert_eq!(metrics.snapshots_total.get(), 10);
1065    }
1066
1067    #[test]
1068    fn test_replication_leader_metrics() {
1069        let metrics = MetricsRegistry::new();
1070
1071        // Follower connected
1072        metrics.replication_followers_connected.set(2);
1073        assert_eq!(metrics.replication_followers_connected.get(), 2);
1074
1075        // WAL entries shipped
1076        metrics.replication_wal_shipped_total.inc_by(100);
1077        assert_eq!(metrics.replication_wal_shipped_total.get(), 100);
1078
1079        // Bytes shipped
1080        metrics
1081            .replication_wal_shipped_bytes_total
1082            .inc_by(1024 * 1024);
1083        assert_eq!(
1084            metrics.replication_wal_shipped_bytes_total.get(),
1085            1024 * 1024
1086        );
1087
1088        // Per-follower lag
1089        metrics
1090            .replication_follower_lag_seconds
1091            .with_label_values(&["follower-1"])
1092            .set(5);
1093        assert_eq!(
1094            metrics
1095                .replication_follower_lag_seconds
1096                .with_label_values(&["follower-1"])
1097                .get(),
1098            5
1099        );
1100
1101        // ACKs received
1102        metrics.replication_acks_total.inc_by(50);
1103        assert_eq!(metrics.replication_acks_total.get(), 50);
1104    }
1105
1106    #[test]
1107    fn test_replication_follower_metrics() {
1108        let metrics = MetricsRegistry::new();
1109
1110        // WAL entries received
1111        metrics.replication_wal_received_total.inc_by(200);
1112        assert_eq!(metrics.replication_wal_received_total.get(), 200);
1113
1114        // WAL entries replayed
1115        metrics.replication_wal_replayed_total.inc_by(195);
1116        assert_eq!(metrics.replication_wal_replayed_total.get(), 195);
1117
1118        // Lag behind leader
1119        metrics.replication_lag_seconds.set(3);
1120        assert_eq!(metrics.replication_lag_seconds.get(), 3);
1121
1122        // Connected state
1123        metrics.replication_connected.set(1);
1124        assert_eq!(metrics.replication_connected.get(), 1);
1125
1126        // Reconnects
1127        metrics.replication_reconnects_total.inc_by(2);
1128        assert_eq!(metrics.replication_reconnects_total.get(), 2);
1129    }
1130
1131    #[test]
1132    fn test_replication_metrics_in_encode() {
1133        let metrics = MetricsRegistry::new();
1134
1135        metrics.replication_followers_connected.set(1);
1136        metrics.replication_wal_shipped_total.inc();
1137        metrics.replication_connected.set(1);
1138
1139        let encoded = metrics.encode().unwrap();
1140        assert!(encoded.contains("allsource_replication_followers_connected"));
1141        assert!(encoded.contains("allsource_replication_wal_shipped_total"));
1142        assert!(encoded.contains("allsource_replication_connected"));
1143    }
1144
1145    #[test]
1146    fn test_http_metrics() {
1147        let metrics = MetricsRegistry::new();
1148
1149        // Record request
1150        metrics
1151            .http_requests_total
1152            .with_label_values(&["GET", "/api/events", "200"])
1153            .inc();
1154
1155        // Record duration
1156        metrics
1157            .http_request_duration_seconds
1158            .with_label_values(&["GET", "/api/events"])
1159            .observe(0.025);
1160
1161        // In-flight requests
1162        metrics.http_requests_in_flight.inc();
1163        assert_eq!(metrics.http_requests_in_flight.get(), 1);
1164
1165        metrics.http_requests_in_flight.dec();
1166        assert_eq!(metrics.http_requests_in_flight.get(), 0);
1167    }
1168}
1169
1170// =============================================================================
1171// Partition Metrics (SierraDB pattern)
1172// =============================================================================
1173
1174/// Per-partition statistics for detecting hot partitions and skew
1175///
1176/// SierraDB uses 32 fixed partitions for single-node, 1024+ for clusters.
1177/// This struct tracks metrics per partition to detect imbalances.
1178#[derive(Debug)]
1179pub struct PartitionStats {
1180    /// Partition ID (0 to partition_count-1)
1181    pub partition_id: u32,
1182
1183    /// Total events written to this partition
1184    pub event_count: u64,
1185
1186    /// Total write latency sum (nanoseconds) for calculating average
1187    pub total_latency_ns: u64,
1188
1189    /// Number of writes for calculating average latency
1190    pub write_count: u64,
1191
1192    /// Minimum write latency (nanoseconds)
1193    pub min_latency_ns: u64,
1194
1195    /// Maximum write latency (nanoseconds)
1196    pub max_latency_ns: u64,
1197
1198    /// Total error count for this partition
1199    pub error_count: u64,
1200}
1201
1202impl PartitionStats {
1203    fn new(partition_id: u32) -> Self {
1204        Self {
1205            partition_id,
1206            event_count: 0,
1207            total_latency_ns: 0,
1208            write_count: 0,
1209            min_latency_ns: u64::MAX,
1210            max_latency_ns: 0,
1211            error_count: 0,
1212        }
1213    }
1214
1215    /// Calculate average write latency
1216    pub fn avg_latency(&self) -> Option<Duration> {
1217        self.total_latency_ns
1218            .checked_div(self.write_count)
1219            .map(Duration::from_nanos)
1220    }
1221}
1222
1223/// Alert generated when partition imbalance is detected
1224#[derive(Debug, Clone, Serialize)]
1225pub struct PartitionImbalanceAlert {
1226    /// Partition ID that is imbalanced
1227    pub partition_id: u32,
1228
1229    /// Event count for this partition
1230    pub event_count: u64,
1231
1232    /// Average event count across all partitions
1233    pub average_count: f64,
1234
1235    /// Ratio compared to average (>2.0 triggers alert)
1236    pub ratio_to_average: f64,
1237
1238    /// Alert message
1239    pub message: String,
1240
1241    /// Timestamp when alert was generated
1242    pub timestamp: chrono::DateTime<chrono::Utc>,
1243}
1244
1245/// Internal structure for tracking partition metrics atomically
1246struct PartitionMetricsEntry {
1247    event_count: AtomicU64,
1248    total_latency_ns: AtomicU64,
1249    write_count: AtomicU64,
1250    min_latency_ns: AtomicU64,
1251    max_latency_ns: AtomicU64,
1252    error_count: AtomicU64,
1253}
1254
1255impl PartitionMetricsEntry {
1256    fn new() -> Self {
1257        Self {
1258            event_count: AtomicU64::new(0),
1259            total_latency_ns: AtomicU64::new(0),
1260            write_count: AtomicU64::new(0),
1261            min_latency_ns: AtomicU64::new(u64::MAX),
1262            max_latency_ns: AtomicU64::new(0),
1263            error_count: AtomicU64::new(0),
1264        }
1265    }
1266}
1267
1268/// Partition monitoring for detecting hot partitions and skew (SierraDB pattern)
1269///
1270/// # Design Pattern
1271/// Uses atomic operations for lock-free metric updates per partition.
1272/// Tracks event counts, write latencies, and error rates per partition.
1273///
1274/// # SierraDB Context
1275/// SierraDB uses fixed partitions (32 for single-node, 1024+ for clusters).
1276/// Detecting hot partitions is critical for:
1277/// - Load balancing decisions
1278/// - Identifying skewed hash functions
1279/// - Capacity planning
1280/// - Performance troubleshooting
1281///
1282/// # Imbalance Detection
1283/// A partition is considered imbalanced if it has >2x the average load.
1284/// This threshold is based on SierraDB's experience with production workloads.
1285///
1286/// # Example
1287/// ```ignore
1288/// let partition_metrics = PartitionMetrics::new(32);
1289///
1290/// // Record write to partition 5
1291/// let start = Instant::now();
1292/// // ... write operation ...
1293/// partition_metrics.record_write(5, start.elapsed());
1294///
1295/// // Check for imbalances
1296/// let alerts = partition_metrics.detect_partition_imbalance();
1297/// for alert in alerts {
1298///     tracing::warn!("Partition imbalance: {}", alert.message);
1299/// }
1300/// ```
1301pub struct PartitionMetrics {
1302    /// Number of partitions
1303    partition_count: u32,
1304
1305    /// Per-partition metrics
1306    partitions: Vec<PartitionMetricsEntry>,
1307
1308    /// Prometheus metrics for per-partition event counts
1309    partition_events_total: IntGaugeVec,
1310
1311    /// Prometheus metrics for per-partition write latency histogram
1312    partition_write_latency: HistogramVec,
1313
1314    /// Prometheus metrics for per-partition error counts
1315    partition_errors_total: IntCounterVec,
1316
1317    /// Prometheus registry (for registration)
1318    registry: Registry,
1319
1320    /// Timestamp when metrics collection started
1321    started_at: Instant,
1322}
1323
1324impl PartitionMetrics {
1325    /// Create a new partition metrics tracker
1326    ///
1327    /// # Arguments
1328    /// * `partition_count` - Number of partitions (default: 32 for single-node)
1329    pub fn new(partition_count: u32) -> Self {
1330        let registry = Registry::new();
1331
1332        // Per-partition event count gauge
1333        let partition_events_total = IntGaugeVec::new(
1334            Opts::new(
1335                "allsource_partition_events_total",
1336                "Total events per partition",
1337            ),
1338            &["partition_id"],
1339        )
1340        .expect("Failed to create partition_events_total metric");
1341
1342        // Per-partition write latency histogram
1343        let partition_write_latency = HistogramVec::new(
1344            HistogramOpts::new(
1345                "allsource_partition_write_latency_seconds",
1346                "Write latency per partition in seconds",
1347            )
1348            .buckets(vec![
1349                0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
1350            ]),
1351            &["partition_id"],
1352        )
1353        .expect("Failed to create partition_write_latency metric");
1354
1355        // Per-partition error counter
1356        let partition_errors_total = IntCounterVec::new(
1357            Opts::new(
1358                "allsource_partition_errors_total",
1359                "Total errors per partition",
1360            ),
1361            &["partition_id"],
1362        )
1363        .expect("Failed to create partition_errors_total metric");
1364
1365        // Register metrics
1366        registry
1367            .register(Box::new(partition_events_total.clone()))
1368            .expect("Failed to register partition_events_total");
1369        registry
1370            .register(Box::new(partition_write_latency.clone()))
1371            .expect("Failed to register partition_write_latency");
1372        registry
1373            .register(Box::new(partition_errors_total.clone()))
1374            .expect("Failed to register partition_errors_total");
1375
1376        // Initialize per-partition atomic counters
1377        let partitions = (0..partition_count)
1378            .map(|_| PartitionMetricsEntry::new())
1379            .collect();
1380
1381        Self {
1382            partition_count,
1383            partitions,
1384            partition_events_total,
1385            partition_write_latency,
1386            partition_errors_total,
1387            registry,
1388            started_at: Instant::now(),
1389        }
1390    }
1391
1392    /// Create partition metrics with default partition count (32)
1393    pub fn with_default_partitions() -> Self {
1394        Self::new(32)
1395    }
1396
1397    /// Record a successful write to a partition
1398    ///
1399    /// # Arguments
1400    /// * `partition_id` - The partition ID (0 to partition_count-1)
1401    /// * `latency` - The write latency
1402    #[inline]
1403    pub fn record_write(&self, partition_id: u32, latency: Duration) {
1404        if partition_id >= self.partition_count {
1405            return;
1406        }
1407
1408        let entry = &self.partitions[partition_id as usize];
1409        let latency_ns = latency.as_nanos() as u64;
1410
1411        // Update atomic counters
1412        entry.event_count.fetch_add(1, Ordering::Relaxed);
1413        entry.write_count.fetch_add(1, Ordering::Relaxed);
1414        entry
1415            .total_latency_ns
1416            .fetch_add(latency_ns, Ordering::Relaxed);
1417
1418        // Update min latency (compare-and-swap loop)
1419        let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1420        while latency_ns < current_min {
1421            match entry.min_latency_ns.compare_exchange_weak(
1422                current_min,
1423                latency_ns,
1424                Ordering::Relaxed,
1425                Ordering::Relaxed,
1426            ) {
1427                Ok(_) => break,
1428                Err(actual) => current_min = actual,
1429            }
1430        }
1431
1432        // Update max latency (compare-and-swap loop)
1433        let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1434        while latency_ns > current_max {
1435            match entry.max_latency_ns.compare_exchange_weak(
1436                current_max,
1437                latency_ns,
1438                Ordering::Relaxed,
1439                Ordering::Relaxed,
1440            ) {
1441                Ok(_) => break,
1442                Err(actual) => current_max = actual,
1443            }
1444        }
1445
1446        // Update Prometheus metrics
1447        let partition_id_str = partition_id.to_string();
1448        self.partition_events_total
1449            .with_label_values(&[&partition_id_str])
1450            .set(entry.event_count.load(Ordering::Relaxed) as i64);
1451        self.partition_write_latency
1452            .with_label_values(&[&partition_id_str])
1453            .observe(latency.as_secs_f64());
1454    }
1455
1456    /// Record an error for a partition
1457    ///
1458    /// # Arguments
1459    /// * `partition_id` - The partition ID (0 to partition_count-1)
1460    #[inline]
1461    pub fn record_error(&self, partition_id: u32) {
1462        if partition_id >= self.partition_count {
1463            return;
1464        }
1465
1466        let entry = &self.partitions[partition_id as usize];
1467        entry.error_count.fetch_add(1, Ordering::Relaxed);
1468
1469        // Update Prometheus metrics
1470        let partition_id_str = partition_id.to_string();
1471        self.partition_errors_total
1472            .with_label_values(&[&partition_id_str])
1473            .inc();
1474    }
1475
1476    /// Record a batch write to a partition
1477    ///
1478    /// # Arguments
1479    /// * `partition_id` - The partition ID (0 to partition_count-1)
1480    /// * `count` - Number of events in the batch
1481    /// * `latency` - Total latency for the batch write
1482    #[inline]
1483    pub fn record_batch_write(&self, partition_id: u32, count: u64, latency: Duration) {
1484        if partition_id >= self.partition_count {
1485            return;
1486        }
1487
1488        let entry = &self.partitions[partition_id as usize];
1489        let latency_ns = latency.as_nanos() as u64;
1490
1491        // Update atomic counters
1492        entry.event_count.fetch_add(count, Ordering::Relaxed);
1493        entry.write_count.fetch_add(1, Ordering::Relaxed);
1494        entry
1495            .total_latency_ns
1496            .fetch_add(latency_ns, Ordering::Relaxed);
1497
1498        // Update min/max latency using per-event average
1499        let per_event_latency_ns = latency_ns / count.max(1);
1500
1501        // Update min latency
1502        let mut current_min = entry.min_latency_ns.load(Ordering::Relaxed);
1503        while per_event_latency_ns < current_min {
1504            match entry.min_latency_ns.compare_exchange_weak(
1505                current_min,
1506                per_event_latency_ns,
1507                Ordering::Relaxed,
1508                Ordering::Relaxed,
1509            ) {
1510                Ok(_) => break,
1511                Err(actual) => current_min = actual,
1512            }
1513        }
1514
1515        // Update max latency
1516        let mut current_max = entry.max_latency_ns.load(Ordering::Relaxed);
1517        while per_event_latency_ns > current_max {
1518            match entry.max_latency_ns.compare_exchange_weak(
1519                current_max,
1520                per_event_latency_ns,
1521                Ordering::Relaxed,
1522                Ordering::Relaxed,
1523            ) {
1524                Ok(_) => break,
1525                Err(actual) => current_max = actual,
1526            }
1527        }
1528
1529        // Update Prometheus metrics
1530        let partition_id_str = partition_id.to_string();
1531        self.partition_events_total
1532            .with_label_values(&[&partition_id_str])
1533            .set(entry.event_count.load(Ordering::Relaxed) as i64);
1534        self.partition_write_latency
1535            .with_label_values(&[&partition_id_str])
1536            .observe(latency.as_secs_f64());
1537    }
1538
1539    /// Get statistics for a specific partition
1540    pub fn get_partition_stats(&self, partition_id: u32) -> Option<PartitionStats> {
1541        if partition_id >= self.partition_count {
1542            return None;
1543        }
1544
1545        let entry = &self.partitions[partition_id as usize];
1546
1547        Some(PartitionStats {
1548            partition_id,
1549            event_count: entry.event_count.load(Ordering::Relaxed),
1550            total_latency_ns: entry.total_latency_ns.load(Ordering::Relaxed),
1551            write_count: entry.write_count.load(Ordering::Relaxed),
1552            min_latency_ns: entry.min_latency_ns.load(Ordering::Relaxed),
1553            max_latency_ns: entry.max_latency_ns.load(Ordering::Relaxed),
1554            error_count: entry.error_count.load(Ordering::Relaxed),
1555        })
1556    }
1557
1558    /// Get statistics for all partitions
1559    pub fn get_all_partition_stats(&self) -> Vec<PartitionStats> {
1560        (0..self.partition_count)
1561            .filter_map(|id| self.get_partition_stats(id))
1562            .collect()
1563    }
1564
1565    /// Detect partition imbalance (hot partitions)
1566    ///
1567    /// Returns alerts for any partition with >2x average event count.
1568    /// This is the SierraDB pattern for detecting skew and hot partitions.
1569    ///
1570    /// # Returns
1571    /// Vector of alerts for imbalanced partitions
1572    pub fn detect_partition_imbalance(&self) -> Vec<PartitionImbalanceAlert> {
1573        let mut alerts = Vec::new();
1574        let stats = self.get_all_partition_stats();
1575
1576        // Calculate total and average event count
1577        let total_events: u64 = stats.iter().map(|s| s.event_count).sum();
1578        let active_partitions = stats.iter().filter(|s| s.event_count > 0).count();
1579
1580        if active_partitions == 0 {
1581            return alerts;
1582        }
1583
1584        let average_count = total_events as f64 / active_partitions as f64;
1585        let imbalance_threshold = 2.0; // SierraDB threshold: 2x average
1586
1587        for stat in stats {
1588            if stat.event_count == 0 {
1589                continue;
1590            }
1591
1592            let ratio = stat.event_count as f64 / average_count;
1593
1594            if ratio > imbalance_threshold {
1595                alerts.push(PartitionImbalanceAlert {
1596                    partition_id: stat.partition_id,
1597                    event_count: stat.event_count,
1598                    average_count,
1599                    ratio_to_average: ratio,
1600                    message: format!(
1601                        "Partition {} has {:.1}x average load ({} events vs {:.0} avg)",
1602                        stat.partition_id, ratio, stat.event_count, average_count
1603                    ),
1604                    timestamp: chrono::Utc::now(),
1605                });
1606            }
1607        }
1608
1609        alerts
1610    }
1611
1612    /// Get partition count
1613    pub fn partition_count(&self) -> u32 {
1614        self.partition_count
1615    }
1616
1617    /// Get total events across all partitions
1618    pub fn total_events(&self) -> u64 {
1619        self.partitions
1620            .iter()
1621            .map(|e| e.event_count.load(Ordering::Relaxed))
1622            .sum()
1623    }
1624
1625    /// Get total errors across all partitions
1626    pub fn total_errors(&self) -> u64 {
1627        self.partitions
1628            .iter()
1629            .map(|e| e.error_count.load(Ordering::Relaxed))
1630            .sum()
1631    }
1632
1633    /// Get uptime since metrics collection started
1634    pub fn uptime(&self) -> Duration {
1635        self.started_at.elapsed()
1636    }
1637
1638    /// Get the Prometheus registry for this partition metrics
1639    pub fn registry(&self) -> &Registry {
1640        &self.registry
1641    }
1642
1643    /// Encode metrics in Prometheus text format
1644    pub fn encode(&self) -> Result<String, Box<dyn std::error::Error>> {
1645        use prometheus::Encoder;
1646        let encoder = prometheus::TextEncoder::new();
1647        let metric_families = self.registry.gather();
1648        let mut buffer = Vec::new();
1649        encoder.encode(&metric_families, &mut buffer)?;
1650        Ok(String::from_utf8(buffer)?)
1651    }
1652
1653    /// Get partition distribution as a map
1654    pub fn get_distribution(&self) -> HashMap<u32, u64> {
1655        self.partitions
1656            .iter()
1657            .enumerate()
1658            .map(|(id, entry)| (id as u32, entry.event_count.load(Ordering::Relaxed)))
1659            .collect()
1660    }
1661
1662    /// Reset all partition metrics
1663    pub fn reset(&self) {
1664        for entry in &self.partitions {
1665            entry.event_count.store(0, Ordering::Relaxed);
1666            entry.total_latency_ns.store(0, Ordering::Relaxed);
1667            entry.write_count.store(0, Ordering::Relaxed);
1668            entry.min_latency_ns.store(u64::MAX, Ordering::Relaxed);
1669            entry.max_latency_ns.store(0, Ordering::Relaxed);
1670            entry.error_count.store(0, Ordering::Relaxed);
1671        }
1672    }
1673}
1674
1675impl Default for PartitionMetrics {
1676    fn default() -> Self {
1677        Self::with_default_partitions()
1678    }
1679}
1680
1681#[cfg(test)]
1682mod partition_tests {
1683    use super::*;
1684    use std::thread;
1685
1686    #[test]
1687    fn test_partition_metrics_creation() {
1688        let metrics = PartitionMetrics::new(32);
1689        assert_eq!(metrics.partition_count(), 32);
1690        assert_eq!(metrics.total_events(), 0);
1691        assert_eq!(metrics.total_errors(), 0);
1692    }
1693
1694    #[test]
1695    fn test_partition_metrics_default() {
1696        let metrics = PartitionMetrics::default();
1697        assert_eq!(metrics.partition_count(), 32);
1698    }
1699
1700    #[test]
1701    fn test_record_write() {
1702        let metrics = PartitionMetrics::new(32);
1703
1704        metrics.record_write(0, Duration::from_micros(100));
1705        metrics.record_write(0, Duration::from_micros(200));
1706        metrics.record_write(1, Duration::from_micros(150));
1707
1708        let stats0 = metrics.get_partition_stats(0).unwrap();
1709        assert_eq!(stats0.event_count, 2);
1710        assert_eq!(stats0.write_count, 2);
1711
1712        let stats1 = metrics.get_partition_stats(1).unwrap();
1713        assert_eq!(stats1.event_count, 1);
1714    }
1715
1716    #[test]
1717    fn test_record_batch_write() {
1718        let metrics = PartitionMetrics::new(32);
1719
1720        metrics.record_batch_write(5, 100, Duration::from_millis(10));
1721
1722        let stats = metrics.get_partition_stats(5).unwrap();
1723        assert_eq!(stats.event_count, 100);
1724        assert_eq!(stats.write_count, 1);
1725    }
1726
1727    #[test]
1728    fn test_record_error() {
1729        let metrics = PartitionMetrics::new(32);
1730
1731        metrics.record_error(3);
1732        metrics.record_error(3);
1733        metrics.record_error(5);
1734
1735        let stats3 = metrics.get_partition_stats(3).unwrap();
1736        assert_eq!(stats3.error_count, 2);
1737
1738        let stats5 = metrics.get_partition_stats(5).unwrap();
1739        assert_eq!(stats5.error_count, 1);
1740
1741        assert_eq!(metrics.total_errors(), 3);
1742    }
1743
1744    #[test]
1745    fn test_invalid_partition_id() {
1746        let metrics = PartitionMetrics::new(32);
1747
1748        // Should not panic, just ignore invalid partition IDs
1749        metrics.record_write(100, Duration::from_micros(100));
1750        metrics.record_error(100);
1751
1752        assert!(metrics.get_partition_stats(100).is_none());
1753    }
1754
1755    #[test]
1756    fn test_latency_tracking() {
1757        let metrics = PartitionMetrics::new(32);
1758
1759        metrics.record_write(0, Duration::from_micros(100));
1760        metrics.record_write(0, Duration::from_micros(200));
1761        metrics.record_write(0, Duration::from_micros(300));
1762
1763        let stats = metrics.get_partition_stats(0).unwrap();
1764        assert_eq!(stats.min_latency_ns, 100_000); // 100 microseconds in nanoseconds
1765        assert_eq!(stats.max_latency_ns, 300_000); // 300 microseconds in nanoseconds
1766
1767        let avg = stats.avg_latency().unwrap();
1768        assert_eq!(avg, Duration::from_nanos(200_000)); // Average: 200 microseconds
1769    }
1770
1771    #[test]
1772    fn test_detect_partition_imbalance_no_imbalance() {
1773        let metrics = PartitionMetrics::new(4);
1774
1775        // Distribute events evenly
1776        for i in 0..4 {
1777            for _ in 0..100 {
1778                metrics.record_write(i, Duration::from_micros(100));
1779            }
1780        }
1781
1782        let alerts = metrics.detect_partition_imbalance();
1783        assert!(
1784            alerts.is_empty(),
1785            "No alerts expected for balanced partitions"
1786        );
1787    }
1788
1789    #[test]
1790    fn test_detect_partition_imbalance_hot_partition() {
1791        let metrics = PartitionMetrics::new(4);
1792
1793        // Partition 0 gets 500 events, others get 100 each
1794        // Average = (500 + 100 + 100 + 100) / 4 = 200
1795        // Partition 0 ratio = 500/200 = 2.5x (>2x threshold)
1796        for _ in 0..500 {
1797            metrics.record_write(0, Duration::from_micros(100));
1798        }
1799        for i in 1..4 {
1800            for _ in 0..100 {
1801                metrics.record_write(i, Duration::from_micros(100));
1802            }
1803        }
1804
1805        let alerts = metrics.detect_partition_imbalance();
1806        assert_eq!(alerts.len(), 1, "Expected one alert for hot partition");
1807        assert_eq!(alerts[0].partition_id, 0);
1808        assert!(alerts[0].ratio_to_average > 2.0);
1809    }
1810
1811    #[test]
1812    fn test_detect_partition_imbalance_empty() {
1813        let metrics = PartitionMetrics::new(4);
1814
1815        let alerts = metrics.detect_partition_imbalance();
1816        assert!(alerts.is_empty(), "No alerts expected for empty metrics");
1817    }
1818
1819    #[test]
1820    fn test_get_all_partition_stats() {
1821        let metrics = PartitionMetrics::new(4);
1822
1823        metrics.record_write(0, Duration::from_micros(100));
1824        metrics.record_write(2, Duration::from_micros(200));
1825
1826        let all_stats = metrics.get_all_partition_stats();
1827        assert_eq!(all_stats.len(), 4);
1828        assert_eq!(all_stats[0].event_count, 1);
1829        assert_eq!(all_stats[1].event_count, 0);
1830        assert_eq!(all_stats[2].event_count, 1);
1831        assert_eq!(all_stats[3].event_count, 0);
1832    }
1833
1834    #[test]
1835    fn test_prometheus_encoding() {
1836        let metrics = PartitionMetrics::new(4);
1837
1838        metrics.record_write(0, Duration::from_micros(100));
1839        metrics.record_write(1, Duration::from_micros(200));
1840        metrics.record_error(0);
1841
1842        let encoded = metrics.encode().unwrap();
1843
1844        assert!(encoded.contains("allsource_partition_events_total"));
1845        assert!(encoded.contains("allsource_partition_write_latency"));
1846        assert!(encoded.contains("allsource_partition_errors_total"));
1847    }
1848
1849    #[test]
1850    fn test_reset() {
1851        let metrics = PartitionMetrics::new(4);
1852
1853        metrics.record_write(0, Duration::from_micros(100));
1854        metrics.record_error(1);
1855
1856        assert_eq!(metrics.total_events(), 1);
1857        assert_eq!(metrics.total_errors(), 1);
1858
1859        metrics.reset();
1860
1861        assert_eq!(metrics.total_events(), 0);
1862        assert_eq!(metrics.total_errors(), 0);
1863    }
1864
1865    #[test]
1866    fn test_concurrent_writes() {
1867        let metrics = Arc::new(PartitionMetrics::new(32));
1868        let mut handles = vec![];
1869
1870        // Spawn 8 threads, each writing 1000 events to random partitions
1871        for _ in 0..8 {
1872            let metrics_clone = metrics.clone();
1873            let handle = thread::spawn(move || {
1874                for i in 0..1000 {
1875                    let partition_id = (i % 32) as u32;
1876                    metrics_clone.record_write(partition_id, Duration::from_micros(100));
1877                }
1878            });
1879            handles.push(handle);
1880        }
1881
1882        for handle in handles {
1883            handle.join().unwrap();
1884        }
1885
1886        assert_eq!(metrics.total_events(), 8000);
1887    }
1888
1889    #[test]
1890    fn test_get_distribution() {
1891        let metrics = PartitionMetrics::new(4);
1892
1893        metrics.record_write(0, Duration::from_micros(100));
1894        metrics.record_write(0, Duration::from_micros(100));
1895        metrics.record_write(2, Duration::from_micros(100));
1896
1897        let distribution = metrics.get_distribution();
1898
1899        assert_eq!(distribution.get(&0), Some(&2));
1900        assert_eq!(distribution.get(&1), Some(&0));
1901        assert_eq!(distribution.get(&2), Some(&1));
1902        assert_eq!(distribution.get(&3), Some(&0));
1903    }
1904
1905    #[test]
1906    fn test_partition_stats_avg_latency_none() {
1907        let stats = PartitionStats::new(0);
1908        assert!(stats.avg_latency().is_none());
1909    }
1910
1911    #[test]
1912    fn test_alert_message_format() {
1913        let metrics = PartitionMetrics::new(4);
1914
1915        // Create imbalanced scenario
1916        for _ in 0..1000 {
1917            metrics.record_write(0, Duration::from_micros(100));
1918        }
1919        for i in 1..4 {
1920            for _ in 0..100 {
1921                metrics.record_write(i, Duration::from_micros(100));
1922            }
1923        }
1924
1925        let alerts = metrics.detect_partition_imbalance();
1926        assert!(!alerts.is_empty());
1927
1928        let alert = &alerts[0];
1929        assert!(alert.message.contains("Partition 0"));
1930        assert!(alert.message.contains("average load"));
1931    }
1932
1933    #[test]
1934    fn test_uptime() {
1935        let metrics = PartitionMetrics::new(4);
1936
1937        // Sleep a bit to ensure non-zero uptime
1938        thread::sleep(Duration::from_millis(10));
1939
1940        let uptime = metrics.uptime();
1941        assert!(uptime.as_millis() >= 10);
1942    }
1943}