Skip to main content

metrics_sqlite/
recorder.rs

1use crate::{Event, RegisterType, SqliteExporter};
2use metrics::{
3    Counter, CounterFn, Gauge, GaugeFn, GaugeValue, Histogram, HistogramFn, Key, KeyName, Metadata,
4    Recorder, SharedString, Unit,
5};
6use std::{
7    sync::{Arc, mpsc::SyncSender},
8    time::SystemTime,
9};
10#[cfg(feature = "log_dropped_metrics")]
11use tracing::error;
12
13pub(crate) struct Handle {
14    sender: SyncSender<Event>,
15    key: Key,
16}
17impl CounterFn for Handle {
18    fn increment(&self, value: u64) {
19        match SystemTime::UNIX_EPOCH.elapsed() {
20            Ok(timestamp) => {
21                if let Err(_e) = self.sender.try_send(Event::IncrementCounter(
22                    timestamp,
23                    self.key.clone(),
24                    value,
25                )) {
26                    #[cfg(feature = "log_dropped_metrics")]
27                    error!(
28                        "Error sending metric to SQLite thread: {}, dropping metric",
29                        _e
30                    );
31                }
32            }
33            Err(_e) => {
34                #[cfg(feature = "log_dropped_metrics")]
35                error!("Failed to get system time: {}, dropping metric", _e);
36            }
37        }
38    }
39
40    fn absolute(&self, value: u64) {
41        match SystemTime::UNIX_EPOCH.elapsed() {
42            Ok(timestamp) => {
43                if let Err(_e) =
44                    self.sender
45                        .try_send(Event::AbsoluteCounter(timestamp, self.key.clone(), value))
46                {
47                    #[cfg(feature = "log_dropped_metrics")]
48                    error!(
49                        "Error sending metric to SQLite thread: {}, dropping metric",
50                        _e
51                    );
52                }
53            }
54            Err(_e) => {
55                #[cfg(feature = "log_dropped_metrics")]
56                error!("Failed to get system time: {}, dropping metric", _e);
57            }
58        }
59    }
60}
61impl GaugeFn for Handle {
62    fn increment(&self, value: f64) {
63        match SystemTime::UNIX_EPOCH.elapsed() {
64            Ok(timestamp) => {
65                if let Err(_e) = self.sender.try_send(Event::UpdateGauge(
66                    timestamp,
67                    self.key.clone(),
68                    GaugeValue::Increment(value),
69                )) {
70                    #[cfg(feature = "log_dropped_metrics")]
71                    error!(
72                        "Error sending metric to SQLite thread: {}, dropping metric",
73                        _e
74                    );
75                }
76            }
77            Err(_e) => {
78                #[cfg(feature = "log_dropped_metrics")]
79                error!("Failed to get system time: {}, dropping metric", _e);
80            }
81        }
82    }
83
84    fn decrement(&self, value: f64) {
85        match SystemTime::UNIX_EPOCH.elapsed() {
86            Ok(timestamp) => {
87                if let Err(_e) = self.sender.try_send(Event::UpdateGauge(
88                    timestamp,
89                    self.key.clone(),
90                    GaugeValue::Decrement(value),
91                )) {
92                    #[cfg(feature = "log_dropped_metrics")]
93                    error!(
94                        "Error sending metric to SQLite thread: {}, dropping metric",
95                        _e
96                    );
97                }
98            }
99            Err(_e) => {
100                #[cfg(feature = "log_dropped_metrics")]
101                error!("Failed to get system time: {}, dropping metric", _e);
102            }
103        }
104    }
105
106    fn set(&self, value: f64) {
107        match SystemTime::UNIX_EPOCH.elapsed() {
108            Ok(timestamp) => {
109                if let Err(_e) = self.sender.try_send(Event::UpdateGauge(
110                    timestamp,
111                    self.key.clone(),
112                    GaugeValue::Absolute(value),
113                )) {
114                    #[cfg(feature = "log_dropped_metrics")]
115                    error!(
116                        "Error sending metric to SQLite thread: {}, dropping metric",
117                        _e
118                    );
119                }
120            }
121            Err(_e) => {
122                #[cfg(feature = "log_dropped_metrics")]
123                error!("Failed to get system time: {}, dropping metric", _e);
124            }
125        }
126    }
127}
128impl HistogramFn for Handle {
129    fn record(&self, value: f64) {
130        match SystemTime::UNIX_EPOCH.elapsed() {
131            Ok(timestamp) => {
132                if let Err(_e) =
133                    self.sender
134                        .try_send(Event::UpdateHistogram(timestamp, self.key.clone(), value))
135                {
136                    #[cfg(feature = "log_dropped_metrics")]
137                    error!(
138                        "Error sending metric to SQLite thread: {}, dropping metric",
139                        _e
140                    );
141                }
142            }
143            Err(_e) => {
144                #[cfg(feature = "log_dropped_metrics")]
145                error!("Failed to get system time: {}, dropping metric", _e);
146            }
147        }
148    }
149}
150impl Recorder for SqliteExporter {
151    fn describe_counter(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
152        if let Err(e) = self.sender.try_send(Event::DescribeKey(
153            RegisterType::Counter,
154            key,
155            unit,
156            description,
157        )) {
158            self.log_send_failure("description", &e);
159        }
160    }
161
162    fn describe_gauge(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
163        if let Err(e) = self.sender.try_send(Event::DescribeKey(
164            RegisterType::Gauge,
165            key,
166            unit,
167            description,
168        )) {
169            self.log_send_failure("description", &e);
170        }
171    }
172
173    fn describe_histogram(&self, key: KeyName, unit: Option<Unit>, description: SharedString) {
174        if let Err(e) = self.sender.try_send(Event::DescribeKey(
175            RegisterType::Histogram,
176            key,
177            unit,
178            description,
179        )) {
180            self.log_send_failure("description", &e);
181        }
182    }
183
184    // Registration doesn't touch the worker channel: the previous implementation
185    // sent a `RegisterKey` event that the worker ignored, which under load
186    // produced the dominant share of "TrySendError::Full" spam.
187    fn register_counter(&self, key: &Key, _metadata: &Metadata) -> Counter {
188        Counter::from_arc(Arc::new(Handle {
189            sender: self.sender.clone(),
190            key: key.clone(),
191        }))
192    }
193
194    fn register_gauge(&self, key: &Key, _metadata: &Metadata) -> Gauge {
195        Gauge::from_arc(Arc::new(Handle {
196            sender: self.sender.clone(),
197            key: key.clone(),
198        }))
199    }
200
201    fn register_histogram(&self, key: &Key, _metadata: &Metadata) -> Histogram {
202        Histogram::from_arc(Arc::new(Handle {
203            sender: self.sender.clone(),
204            key: key.clone(),
205        }))
206    }
207}