1use once_cell::sync::Lazy;
2use prometheus::{
3 Encoder, Histogram, HistogramOpts, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder,
4};
5
6pub static REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
7
8pub static OP_THROUGHPUT: Lazy<IntCounterVec> = Lazy::new(|| {
9 let c = IntCounterVec::new(
10 Opts::new("pulse_operator_records_total", "Records processed by operator"),
11 &["operator", "stage"],
12 )
13 .unwrap();
14 REGISTRY.register(Box::new(c.clone())).unwrap();
15 c
16});
17
18pub static LAG_WATERMARK_MS: Lazy<IntGauge> = Lazy::new(|| {
19 let g = IntGauge::new(
20 "pulse_watermark_lag_ms",
21 "Lag between now and current watermark in ms",
22 )
23 .unwrap();
24 REGISTRY.register(Box::new(g.clone())).unwrap();
25 g
26});
27
28pub static BYTES_WRITTEN: Lazy<IntCounterVec> = Lazy::new(|| {
29 let c = IntCounterVec::new(
30 Opts::new("pulse_bytes_written_total", "Total bytes written by sink"),
31 &["sink"],
32 )
33 .unwrap();
34 REGISTRY.register(Box::new(c.clone())).unwrap();
35 c
36});
37
38pub static STATE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
39 let g = IntGaugeVec::new(
40 Opts::new("pulse_state_size", "State size per operator"),
41 &["operator"],
42 )
43 .unwrap();
44 REGISTRY.register(Box::new(g.clone())).unwrap();
45 g
46});
47
48pub static OP_PROC_LATENCY_MS: Lazy<Histogram> = Lazy::new(|| {
49 let h = Histogram::with_opts(
50 HistogramOpts::new(
51 "pulse_operator_process_latency_ms",
52 "Operator on_element processing latency (ms)",
53 )
54 .buckets(vec![
55 0.1, 0.5, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0,
56 ]),
57 )
58 .unwrap();
59 REGISTRY.register(Box::new(h.clone())).unwrap();
60 h
61});
62
63pub static SINK_PROC_LATENCY_MS: Lazy<Histogram> = Lazy::new(|| {
64 let h = Histogram::with_opts(
65 HistogramOpts::new(
66 "pulse_sink_process_latency_ms",
67 "Sink on_element processing latency (ms)",
68 )
69 .buckets(vec![
70 0.1, 0.5, 1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0,
71 ]),
72 )
73 .unwrap();
74 REGISTRY.register(Box::new(h.clone())).unwrap();
75 h
76});
77
78pub static QUEUE_DEPTH: Lazy<IntGauge> = Lazy::new(|| {
79 let g = IntGauge::new(
80 "pulse_queue_depth",
81 "Current in-flight queue depth between source and operators",
82 )
83 .unwrap();
84 REGISTRY.register(Box::new(g.clone())).unwrap();
85 g
86});
87
88pub static DROPPED_RECORDS: Lazy<IntCounterVec> = Lazy::new(|| {
89 let c = IntCounterVec::new(
90 Opts::new(
91 "pulse_dropped_records_total",
92 "Total records dropped due to backpressure",
93 ),
94 &["reason"],
95 )
96 .unwrap();
97 REGISTRY.register(Box::new(c.clone())).unwrap();
98 c
99});
100
101pub fn render_prometheus() -> String {
102 let mut buffer = Vec::new();
103 let encoder = TextEncoder::new();
104 encoder.encode(®ISTRY.gather(), &mut buffer).ok();
105 String::from_utf8(buffer).unwrap_or_default()
106}