Skip to main content

hyperi_rustlib/metrics/dfe_groups/
sink.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe_groups/sink.rs
3// Purpose:   DFE sink metrics group
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Sink/insert metrics for DFE apps with a downstream.
10
11use metrics::Gauge;
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16/// Sink write metrics.
17///
18/// Tracks write latency, errors, bytes sent, and concurrent insert count.
19#[derive(Clone)]
20pub struct SinkMetrics {
21    pub concurrent_inserts: Gauge,
22    namespace: String,
23}
24
25impl SinkMetrics {
26    #[must_use]
27    pub fn new(manager: &MetricsManager) -> Self {
28        let ns = manager.namespace();
29
30        // sink_duration_seconds -- label-based, register descriptor manually
31        let dur_key = if ns.is_empty() {
32            "sink_duration_seconds".to_string()
33        } else {
34            format!("{ns}_sink_duration_seconds")
35        };
36        metrics::describe_histogram!(
37            dur_key.clone(),
38            metrics::Unit::Seconds,
39            "Sink write latency"
40        );
41        manager.registry().push(MetricDescriptor {
42            name: dur_key,
43            metric_type: MetricType::Histogram,
44            description: "Sink write latency".into(),
45            unit: "seconds".into(),
46            labels: vec!["backend".into()],
47            group: "sink".into(),
48            buckets: None,
49            use_cases: vec![],
50            dashboard_hint: None,
51        });
52
53        // sink_errors_total -- label-based
54        let err_key = if ns.is_empty() {
55            "sink_errors_total".to_string()
56        } else {
57            format!("{ns}_sink_errors_total")
58        };
59        metrics::describe_counter!(err_key.clone(), "Sink write errors");
60        manager.registry().push(MetricDescriptor {
61            name: err_key,
62            metric_type: MetricType::Counter,
63            description: "Sink write errors".into(),
64            unit: String::new(),
65            labels: vec!["backend".into()],
66            group: "sink".into(),
67            buckets: None,
68            use_cases: vec![],
69            dashboard_hint: None,
70        });
71
72        // bytes_sent_total -- label-based
73        let bytes_key = if ns.is_empty() {
74            "bytes_sent_total".to_string()
75        } else {
76            format!("{ns}_bytes_sent_total")
77        };
78        metrics::describe_counter!(bytes_key.clone(), "Bytes sent to sink");
79        manager.registry().push(MetricDescriptor {
80            name: bytes_key,
81            metric_type: MetricType::Counter,
82            description: "Bytes sent to sink".into(),
83            unit: String::new(),
84            labels: vec!["format".into()],
85            group: "sink".into(),
86            buckets: None,
87            use_cases: vec![],
88            dashboard_hint: None,
89        });
90
91        Self {
92            concurrent_inserts: manager.gauge_with_labels(
93                "concurrent_inserts",
94                "In-flight insert/write operations",
95                &[],
96                "sink",
97            ),
98            namespace: ns.to_string(),
99        }
100    }
101
102    /// Record a sink write with backend label.
103    #[inline]
104    pub fn record_duration(&self, backend: &str, seconds: f64) {
105        let key = if self.namespace.is_empty() {
106            "sink_duration_seconds".to_string()
107        } else {
108            format!("{}_sink_duration_seconds", self.namespace)
109        };
110        metrics::histogram!(key, "backend" => backend.to_string()).record(seconds);
111    }
112
113    /// Record a sink write error with backend label.
114    #[inline]
115    pub fn record_error(&self, backend: &str) {
116        let key = if self.namespace.is_empty() {
117            "sink_errors_total".to_string()
118        } else {
119            format!("{}_sink_errors_total", self.namespace)
120        };
121        metrics::counter!(key, "backend" => backend.to_string()).increment(1);
122    }
123
124    /// Record bytes sent with format label.
125    #[inline]
126    pub fn record_bytes_sent(&self, format: &str, bytes: u64) {
127        let key = if self.namespace.is_empty() {
128            "bytes_sent_total".to_string()
129        } else {
130            format!("{}_bytes_sent_total", self.namespace)
131        };
132        metrics::counter!(key, "format" => format.to_string()).increment(bytes);
133    }
134
135    #[inline]
136    pub fn set_concurrent_inserts(&self, count: usize) {
137        self.concurrent_inserts.set(count as f64);
138    }
139}