hyperi_rustlib/metrics/dfe_groups/
sink.rs1use metrics::Gauge;
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16#[derive(Clone)]
20pub struct SinkMetrics {
21 pub concurrent_inserts: Gauge,
22 namespace: String,
23}
24
25impl SinkMetrics {
26 #[must_use]
27 pub fn new(manager: &MetricsManager) -> Self {
28 let ns = manager.namespace();
29
30 let dur_key = if ns.is_empty() {
32 "sink_duration_seconds".to_string()
33 } else {
34 format!("{ns}_sink_duration_seconds")
35 };
36 metrics::describe_histogram!(
37 dur_key.clone(),
38 metrics::Unit::Seconds,
39 "Sink write latency"
40 );
41 manager.registry().push(MetricDescriptor {
42 name: dur_key,
43 metric_type: MetricType::Histogram,
44 description: "Sink write latency".into(),
45 unit: "seconds".into(),
46 labels: vec!["backend".into()],
47 group: "sink".into(),
48 buckets: None,
49 use_cases: vec![],
50 dashboard_hint: None,
51 });
52
53 let err_key = if ns.is_empty() {
55 "sink_errors_total".to_string()
56 } else {
57 format!("{ns}_sink_errors_total")
58 };
59 metrics::describe_counter!(err_key.clone(), "Sink write errors");
60 manager.registry().push(MetricDescriptor {
61 name: err_key,
62 metric_type: MetricType::Counter,
63 description: "Sink write errors".into(),
64 unit: String::new(),
65 labels: vec!["backend".into()],
66 group: "sink".into(),
67 buckets: None,
68 use_cases: vec![],
69 dashboard_hint: None,
70 });
71
72 let bytes_key = if ns.is_empty() {
74 "bytes_sent_total".to_string()
75 } else {
76 format!("{ns}_bytes_sent_total")
77 };
78 metrics::describe_counter!(bytes_key.clone(), "Bytes sent to sink");
79 manager.registry().push(MetricDescriptor {
80 name: bytes_key,
81 metric_type: MetricType::Counter,
82 description: "Bytes sent to sink".into(),
83 unit: String::new(),
84 labels: vec!["format".into()],
85 group: "sink".into(),
86 buckets: None,
87 use_cases: vec![],
88 dashboard_hint: None,
89 });
90
91 Self {
92 concurrent_inserts: manager.gauge_with_labels(
93 "concurrent_inserts",
94 "In-flight insert/write operations",
95 &[],
96 "sink",
97 ),
98 namespace: ns.to_string(),
99 }
100 }
101
102 #[inline]
104 pub fn record_duration(&self, backend: &str, seconds: f64) {
105 let key = if self.namespace.is_empty() {
106 "sink_duration_seconds".to_string()
107 } else {
108 format!("{}_sink_duration_seconds", self.namespace)
109 };
110 metrics::histogram!(key, "backend" => backend.to_string()).record(seconds);
111 }
112
113 #[inline]
115 pub fn record_error(&self, backend: &str) {
116 let key = if self.namespace.is_empty() {
117 "sink_errors_total".to_string()
118 } else {
119 format!("{}_sink_errors_total", self.namespace)
120 };
121 metrics::counter!(key, "backend" => backend.to_string()).increment(1);
122 }
123
124 #[inline]
126 pub fn record_bytes_sent(&self, format: &str, bytes: u64) {
127 let key = if self.namespace.is_empty() {
128 "bytes_sent_total".to_string()
129 } else {
130 format!("{}_bytes_sent_total", self.namespace)
131 };
132 metrics::counter!(key, "format" => format.to_string()).increment(bytes);
133 }
134
135 #[inline]
136 pub fn set_concurrent_inserts(&self, count: usize) {
137 self.concurrent_inserts.set(count as f64);
138 }
139}