metrics_exporter_scope/
lib.rs

1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3#[cfg(feature = "msrv")]
4extern crate metrics_legacy as metrics;
5#[cfg(feature = "msrv")]
6extern crate metrics_util_legacy as metrics_util;
7
8use std::{
9    collections::BTreeMap,
10    net::{SocketAddr, TcpListener, TcpStream},
11    num::TryFromIntError,
12    sync::{atomic::Ordering, Arc},
13    thread,
14    time::Duration,
15};
16
17use bma_ts::Monotonic;
18use metrics::{Key, Recorder};
19use metrics_util::registry::{AtomicStorage, GenerationalStorage, Registry};
20use rtsc::time::interval;
21use serde::{Deserialize, Serialize};
22use tracing::{error, info};
23
24/// Crate error type
25#[derive(thiserror::Error, Debug)]
26pub enum Error {
27    /// I/O errors
28    #[error("io error: {0}")]
29    Io(#[from] std::io::Error),
30    /// Data serialization errors
31    #[error("encode error: {0}")]
32    Encode(#[from] rmp_serde::encode::Error),
33    /// Data deserialization errors
34    #[error("decode error: {0}")]
35    Decode(#[from] rmp_serde::decode::Error),
36    /// Recorder setup errors
37    #[error("set recorder error: {0}")]
38    SetRecorder(#[from] metrics::SetRecorderError<ScopeRecorder>),
39    /// Other errors
40    #[error("{0}")]
41    Other(String),
42}
43
44impl From<TryFromIntError> for Error {
45    fn from(error: TryFromIntError) -> Self {
46        Self::Other(error.to_string())
47    }
48}
49
50const CLIENT_CHAT_TIMEOUT: Duration = Duration::from_secs(60);
51
52const SEND_INFO_INTERVAL: Duration = Duration::from_secs(5);
53
54const SERVER_THREAD_NAME: &str = "MScopeSrv";
55
56/// Communication protocol
57pub mod protocol {
58
59    /// Current protocol version
60    pub const VERSION: u16 = 1;
61
62    use std::io::{Read, Write};
63
64    use crate::{ClientSettings, Error, Packet};
65    use serde::{Deserialize, Serialize};
66
67    /// Read protocol version from a stream
68    pub fn read_version<R>(mut stream: R) -> Result<u16, Error>
69    where
70        R: Read,
71    {
72        let buf = &mut [0u8; 2];
73        stream.read_exact(buf)?;
74        Ok(u16::from_le_bytes(*buf))
75    }
76
77    /// Write protocol version to a stream
78    pub fn write_version<W>(mut stream: W) -> Result<(), Error>
79    where
80        W: Write,
81    {
82        stream.write_all(&VERSION.to_le_bytes())?;
83        Ok(())
84    }
85
86    /// Read a packet from a stream
87    pub fn read_packet<R>(stream: R) -> Result<Packet, Error>
88    where
89        R: Read,
90    {
91        read(stream)
92    }
93
94    /// Write a packet to a stream
95    pub fn write_packet<W>(stream: W, packet: &Packet) -> Result<(), Error>
96    where
97        W: Write,
98    {
99        write(stream, packet)
100    }
101
102    /// Read client settings from a stream
103    pub fn read_client_settings<R>(stream: R) -> Result<ClientSettings, Error>
104    where
105        R: Read,
106    {
107        read(stream)
108    }
109
110    /// Write client settings to a stream
111    pub fn write_client_settings<W>(stream: W, settings: &ClientSettings) -> Result<(), Error>
112    where
113        W: Write,
114    {
115        write(stream, settings)
116    }
117
118    fn write<D, W>(mut stream: W, data: D) -> Result<(), Error>
119    where
120        W: Write,
121        D: Serialize,
122    {
123        let data = rmp_serde::to_vec_named(&data)?;
124        stream.write_all(&u32::try_from(data.len())?.to_le_bytes())?;
125        stream.write_all(&data)?;
126        Ok(())
127    }
128
129    fn read<R, D>(mut stream: R) -> Result<D, Error>
130    where
131        R: Read,
132        D: for<'de> Deserialize<'de>,
133    {
134        let buf = &mut [0u8; 4];
135        stream.read_exact(buf)?;
136        let len = usize::try_from(u32::from_le_bytes(*buf))?;
137        let mut buf = vec![0u8; len];
138        stream.read_exact(&mut buf)?;
139        Ok(rmp_serde::from_slice(&buf)?)
140    }
141}
142
143/// Communication packets
144#[derive(Clone, Serialize, Deserialize, Debug)]
145#[serde(untagged)]
146pub enum Packet {
147    /// Information packet (metrics metadata)
148    Info(Info),
149    /// Snapshot packet (metrics data)
150    Snapshot(Snapshot),
151}
152
153/// Client settings
154#[derive(Clone, Serialize, Deserialize, Debug)]
155pub struct ClientSettings {
156    sampling_interval: u64,
157}
158
159impl ClientSettings {
160    /// # Panics
161    ///
162    /// Panics if the duration is too large to fit into a u64.
163    pub fn new(sampling_interval: Duration) -> Self {
164        Self {
165            sampling_interval: u64::try_from(sampling_interval.as_nanos()).unwrap(),
166        }
167    }
168}
169
170/// Information packet
171#[derive(Clone, Serialize, Deserialize, Debug)]
172pub struct Info {
173    metrics: BTreeMap<String, MetricInfo>,
174}
175
176impl Info {
177    /// Get metrics metadata map
178    pub fn metrics(&self) -> &BTreeMap<String, MetricInfo> {
179        &self.metrics
180    }
181}
182
183/// Metrics metadata
184#[derive(Clone, Serialize, Deserialize, Debug)]
185pub struct MetricInfo {
186    labels: BTreeMap<String, String>,
187}
188
189impl MetricInfo {
190    /// Metric labels map
191    pub fn labels(&self) -> &BTreeMap<String, String> {
192        &self.labels
193    }
194}
195
196/// Snapshot packet
197#[derive(Clone, Serialize, Deserialize, Debug)]
198pub struct Snapshot {
199    t: Monotonic,
200    d: BTreeMap<String, f64>,
201}
202
203impl Snapshot {
204    /// Snapshot timestamp (monotonic, relative to the communication start)
205    pub fn ts(&self) -> Monotonic {
206        self.t
207    }
208    /// Snapshot data map (metric name -> value)
209    pub fn data(&self) -> &BTreeMap<String, f64> {
210        &self.d
211    }
212    /// Snapshot data map mutable (metric name -> value)
213    pub fn data_mut(&mut self) -> &mut BTreeMap<String, f64> {
214        &mut self.d
215    }
216    /// Take snapshot data map
217    pub fn take_data(&mut self) -> BTreeMap<String, f64> {
218        std::mem::take(&mut self.d)
219    }
220}
221
222/// Exporter builder
223pub struct ScopeBuilder {
224    addr: SocketAddr,
225    fallback: Option<Box<dyn Recorder + Send + Sync>>,
226}
227
228impl Default for ScopeBuilder {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234impl ScopeBuilder {
235    /// Create a new exporter builder
236    pub fn new() -> Self {
237        Self {
238            addr: (std::net::Ipv4Addr::UNSPECIFIED, 5001).into(),
239            fallback: None,
240        }
241    }
242    /// Set the server listening address and port
243    pub fn with_addr<A: Into<SocketAddr>>(mut self, addr: A) -> Self {
244        self.addr = addr.into();
245        self
246    }
247    /// Set the fallback recorder
248    pub fn with_fallback(mut self, fallback: Box<dyn Recorder + Send + Sync>) -> Self {
249        self.fallback = Some(fallback);
250        self
251    }
252    /// Build the exporter's recorder
253    pub fn build(self) -> ScopeRecorder {
254        ScopeRecorder::build(self.addr, self.fallback)
255    }
256    /// Build the exporter's recorder and install it as the global recorder
257    pub fn install(self) -> Result<(), Error> {
258        self.build().install()
259    }
260}
261
262/// Scope recorder
263#[derive(Clone)]
264pub struct ScopeRecorder {
265    inner: Arc<Inner>,
266    fallback: Arc<Option<Box<dyn Recorder + Send + Sync>>>,
267}
268
269impl ScopeRecorder {
270    fn build<A: Into<SocketAddr>>(
271        addr: A,
272        fallback: Option<Box<dyn Recorder + Send + Sync>>,
273    ) -> Self {
274        Self {
275            inner: Arc::new(Inner::new(addr.into())),
276            fallback: fallback.into(),
277        }
278    }
279    /// # Panics
280    ///
281    /// Panics if the global recorder has already been set.
282    pub fn install(self) -> Result<(), Error> {
283        self.spawn_tasks()?;
284        metrics::set_global_recorder(self).map_err(Into::into)
285    }
286    fn spawn_tasks(&self) -> Result<(), std::io::Error> {
287        self.inner.spawn_server(self.inner.addr)?;
288        Ok(())
289    }
290}
291
292struct Inner {
293    registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
294    addr: SocketAddr,
295}
296
297impl Inner {
298    fn new(addr: SocketAddr) -> Self {
299        let registry = Registry::new(GenerationalStorage::new(AtomicStorage));
300        Self { registry, addr }
301    }
302    fn snapshot(&self, t: Monotonic) -> Snapshot {
303        let handles = self.registry.get_gauge_handles();
304        let mut map = BTreeMap::new();
305        for (key, gauge) in handles {
306            let name = key.name();
307            let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
308            map.insert(name[1..].to_string(), value);
309        }
310        Snapshot { t, d: map }
311    }
312    fn info(&self) -> Info {
313        let info = self
314            .registry
315            .get_gauge_handles()
316            .iter()
317            .map(|(key, _)| {
318                let labels = key
319                    .labels()
320                    .map(|label| (label.key().to_owned(), label.value().to_owned()));
321                (
322                    key.name()[1..].to_string(),
323                    MetricInfo {
324                        labels: labels.collect(),
325                    },
326                )
327            })
328            .collect();
329        Info { metrics: info }
330    }
331    fn spawn_server(self: &Arc<Self>, addr: SocketAddr) -> Result<(), std::io::Error> {
332        let listener = TcpListener::bind(addr)?;
333        let metrics_scope = self.clone();
334        thread::Builder::new()
335            .name(SERVER_THREAD_NAME.to_owned())
336            .spawn(move || {
337                while let Ok((stream, addr)) = listener.accept() {
338                    info!(?addr, "client connected");
339                    let metrics_scope = metrics_scope.clone();
340                    thread::spawn(move || {
341                        if let Err(error) = handle_client(stream, metrics_scope) {
342                            error!(?addr, ?error, "client error, disconnected");
343                        } else {
344                            info!(?addr, "client disconnected");
345                        }
346                    });
347                }
348            })?;
349        Ok(())
350    }
351}
352fn handle_client(mut stream: TcpStream, metrics_scope: Arc<Inner>) -> Result<(), Error> {
353    stream.set_read_timeout(Some(CLIENT_CHAT_TIMEOUT))?;
354    stream.set_write_timeout(Some(CLIENT_CHAT_TIMEOUT))?;
355    stream.set_nodelay(true)?;
356    protocol::write_version(&mut stream)?;
357    let clients_settings = protocol::read_client_settings(&mut stream)?;
358    stream.set_read_timeout(None)?;
359    stream.set_write_timeout(None)?;
360    protocol::write_packet(&mut stream, &Packet::Info(metrics_scope.info()))?;
361    let mut last_info_sent = Monotonic::now();
362    let int_ns = u128::from(clients_settings.sampling_interval);
363    let start = Monotonic::now();
364    for _ in interval(Duration::from_nanos(clients_settings.sampling_interval)) {
365        let ts = Monotonic::from_nanos(
366            (start.elapsed().as_nanos() / int_ns * int_ns)
367                .try_into()
368                .unwrap(),
369        );
370        let packet = Packet::Snapshot(metrics_scope.snapshot(ts));
371        if protocol::write_packet(&mut stream, &packet).is_err() {
372            break;
373        }
374        if last_info_sent.elapsed() >= SEND_INFO_INTERVAL {
375            let packet = Packet::Info(metrics_scope.info());
376            if protocol::write_packet(&mut stream, &packet).is_err() {
377                break;
378            }
379            last_info_sent = Monotonic::now();
380        }
381    }
382    Ok(())
383}
384
385impl Recorder for ScopeRecorder {
386    fn describe_counter(
387        &self,
388        key: metrics::KeyName,
389        unit: Option<metrics::Unit>,
390        description: metrics::SharedString,
391    ) {
392        if let Some(fallback) = self.fallback.as_ref() {
393            fallback.describe_counter(key, unit, description);
394        }
395    }
396
397    fn describe_gauge(
398        &self,
399        key: metrics::KeyName,
400        unit: Option<metrics::Unit>,
401        description: metrics::SharedString,
402    ) {
403        if let Some(fallback) = self.fallback.as_ref() {
404            fallback.describe_gauge(key, unit, description);
405        }
406    }
407
408    fn describe_histogram(
409        &self,
410        key: metrics::KeyName,
411        unit: Option<metrics::Unit>,
412        description: metrics::SharedString,
413    ) {
414        if let Some(fallback) = self.fallback.as_ref() {
415            fallback.describe_histogram(key, unit, description);
416        }
417    }
418
419    fn register_counter(
420        &self,
421        key: &metrics::Key,
422        metadata: &metrics::Metadata<'_>,
423    ) -> metrics::Counter {
424        if let Some(fallback) = self.fallback.as_ref() {
425            fallback.register_counter(key, metadata)
426        } else {
427            metrics::Counter::noop()
428        }
429    }
430
431    fn register_gauge(
432        &self,
433        key: &metrics::Key,
434        metadata: &metrics::Metadata<'_>,
435    ) -> metrics::Gauge {
436        if key.name().starts_with('~') {
437            self.inner
438                .registry
439                .get_or_create_gauge(key, |c| c.clone().into())
440        } else if let Some(fallback) = self.fallback.as_ref() {
441            fallback.register_gauge(key, metadata)
442        } else {
443            metrics::Gauge::noop()
444        }
445    }
446
447    fn register_histogram(
448        &self,
449        key: &metrics::Key,
450        metadata: &metrics::Metadata<'_>,
451    ) -> metrics::Histogram {
452        if let Some(fallback) = self.fallback.as_ref() {
453            fallback.register_histogram(key, metadata)
454        } else {
455            metrics::Histogram::noop()
456        }
457    }
458}