1use once_cell::sync::Lazy;
2use prometheus::{
3    Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder,
4};
5
6pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
7
8pub static OP_THROUGHPUT: Lazy<IntCounterVec> = Lazy::new(|| {
9    let c = IntCounterVec::new(
10        Opts::new("pulse_operator_records_total", "Records processed by operator"),
11        &["operator", "stage"],
12    )
13    .unwrap();
14    REGISTRY.register(Box::new(c.clone())).unwrap();
15    c
16});
17
18pub static LAG_WATERMARK_MS: Lazy<IntGauge> = Lazy::new(|| {
19    let g = IntGauge::new(
20        "pulse_watermark_lag_ms",
21        "Lag between now and current watermark in ms",
22    )
23    .unwrap();
24    REGISTRY.register(Box::new(g.clone())).unwrap();
25    g
26});
27
28pub static BYTES_WRITTEN: Lazy<IntCounterVec> = Lazy::new(|| {
29    let c = IntCounterVec::new(
30        Opts::new("pulse_bytes_written_total", "Total bytes written by sink"),
31        &["sink"],
32    )
33    .unwrap();
34    REGISTRY.register(Box::new(c.clone())).unwrap();
35    c
36});
37
38pub static STATE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
39    let g = IntGaugeVec::new(
40        Opts::new("pulse_state_size", "State size per operator"),
41        &["operator"],
42    )
43    .unwrap();
44    REGISTRY.register(Box::new(g.clone())).unwrap();
45    g
46});
47
48pub static OP_PROC_LATENCY_MS: Lazy<Histogram> = Lazy::new(|| {
49    let h = Histogram::with_opts(
50        HistogramOpts::new(
51            "pulse_operator_process_latency_ms",
52            "Operator on_element processing latency (ms)",
53        )
54        .buckets(vec![
55            0.1, 0.5, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0,
56        ]),
57    )
58    .unwrap();
59    REGISTRY.register(Box::new(h.clone())).unwrap();
60    h
61});
62
63pub static SINK_PROC_LATENCY_MS: Lazy<Histogram> = Lazy::new(|| {
64    let h = Histogram::with_opts(
65        HistogramOpts::new(
66            "pulse_sink_process_latency_ms",
67            "Sink on_element processing latency (ms)",
68        )
69        .buckets(vec![
70            0.1, 0.5, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0,
71        ]),
72    )
73    .unwrap();
74    REGISTRY.register(Box::new(h.clone())).unwrap();
75    h
76});
77
78pub static QUEUE_DEPTH: Lazy<IntGauge> = Lazy::new(|| {
79    let g = IntGauge::new(
80        "pulse_queue_depth",
81        "Current in-flight queue depth between source and operators",
82    )
83    .unwrap();
84    REGISTRY.register(Box::new(g.clone())).unwrap();
85    g
86});
87
88pub static DROPPED_RECORDS: Lazy<IntCounterVec> = Lazy::new(|| {
89    let c = IntCounterVec::new(
90        Opts::new(
91            "pulse_dropped_records_total",
92            "Total records dropped due to backpressure",
93        ),
94        &["reason"],
95    )
96    .unwrap();
97    REGISTRY.register(Box::new(c.clone())).unwrap();
98    c
99});
100
101pub fn render_prometheus() -> String {
102    let mut buffer = Vec::new();
103    let encoder = TextEncoder::new();
104    encoder.encode(®ISTRY.gather(), &mut buffer).ok();
105    String::from_utf8(buffer).unwrap_or_default()
106}