1#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
2#![deny(missing_docs)]
3#[cfg(feature = "msrv")]
4extern crate metrics_legacy as metrics;
5#[cfg(feature = "msrv")]
6extern crate metrics_util_legacy as metrics_util;
7
8use std::{
9 collections::BTreeMap,
10 net::{SocketAddr, TcpListener, TcpStream},
11 num::TryFromIntError,
12 sync::{atomic::Ordering, Arc},
13 thread,
14 time::Duration,
15};
16
17use bma_ts::Monotonic;
18use metrics::{Key, Recorder};
19use metrics_util::registry::{AtomicStorage, GenerationalStorage, Registry};
20use rtsc::time::interval;
21use serde::{Deserialize, Serialize};
22use tracing::{error, info};
23
24#[derive(thiserror::Error, Debug)]
26pub enum Error {
27 #[error("io error: {0}")]
29 Io(#[from] std::io::Error),
30 #[error("encode error: {0}")]
32 Encode(#[from] rmp_serde::encode::Error),
33 #[error("decode error: {0}")]
35 Decode(#[from] rmp_serde::decode::Error),
36 #[error("set recorder error: {0}")]
38 SetRecorder(#[from] metrics::SetRecorderError<ScopeRecorder>),
39 #[error("{0}")]
41 Other(String),
42}
43
44impl From<TryFromIntError> for Error {
45 fn from(error: TryFromIntError) -> Self {
46 Self::Other(error.to_string())
47 }
48}
49
50const CLIENT_CHAT_TIMEOUT: Duration = Duration::from_secs(60);
51
52const SEND_INFO_INTERVAL: Duration = Duration::from_secs(5);
53
54const SERVER_THREAD_NAME: &str = "MScopeSrv";
55
56pub mod protocol {
58
59 pub const VERSION: u16 = 1;
61
62 use std::io::{Read, Write};
63
64 use crate::{ClientSettings, Error, Packet};
65 use serde::{Deserialize, Serialize};
66
67 pub fn read_version<R>(mut stream: R) -> Result<u16, Error>
69 where
70 R: Read,
71 {
72 let buf = &mut [0u8; 2];
73 stream.read_exact(buf)?;
74 Ok(u16::from_le_bytes(*buf))
75 }
76
77 pub fn write_version<W>(mut stream: W) -> Result<(), Error>
79 where
80 W: Write,
81 {
82 stream.write_all(&VERSION.to_le_bytes())?;
83 Ok(())
84 }
85
86 pub fn read_packet<R>(stream: R) -> Result<Packet, Error>
88 where
89 R: Read,
90 {
91 read(stream)
92 }
93
94 pub fn write_packet<W>(stream: W, packet: &Packet) -> Result<(), Error>
96 where
97 W: Write,
98 {
99 write(stream, packet)
100 }
101
102 pub fn read_client_settings<R>(stream: R) -> Result<ClientSettings, Error>
104 where
105 R: Read,
106 {
107 read(stream)
108 }
109
110 pub fn write_client_settings<W>(stream: W, settings: &ClientSettings) -> Result<(), Error>
112 where
113 W: Write,
114 {
115 write(stream, settings)
116 }
117
118 fn write<D, W>(mut stream: W, data: D) -> Result<(), Error>
119 where
120 W: Write,
121 D: Serialize,
122 {
123 let data = rmp_serde::to_vec_named(&data)?;
124 stream.write_all(&u32::try_from(data.len())?.to_le_bytes())?;
125 stream.write_all(&data)?;
126 Ok(())
127 }
128
129 fn read<R, D>(mut stream: R) -> Result<D, Error>
130 where
131 R: Read,
132 D: for<'de> Deserialize<'de>,
133 {
134 let buf = &mut [0u8; 4];
135 stream.read_exact(buf)?;
136 let len = usize::try_from(u32::from_le_bytes(*buf))?;
137 let mut buf = vec![0u8; len];
138 stream.read_exact(&mut buf)?;
139 Ok(rmp_serde::from_slice(&buf)?)
140 }
141}
142
143#[derive(Clone, Serialize, Deserialize, Debug)]
145#[serde(untagged)]
146pub enum Packet {
147 Info(Info),
149 Snapshot(Snapshot),
151}
152
153#[derive(Clone, Serialize, Deserialize, Debug)]
155pub struct ClientSettings {
156 sampling_interval: u64,
157}
158
159impl ClientSettings {
160 pub fn new(sampling_interval: Duration) -> Self {
164 Self {
165 sampling_interval: u64::try_from(sampling_interval.as_nanos()).unwrap(),
166 }
167 }
168}
169
170#[derive(Clone, Serialize, Deserialize, Debug)]
172pub struct Info {
173 metrics: BTreeMap<String, MetricInfo>,
174}
175
176impl Info {
177 pub fn metrics(&self) -> &BTreeMap<String, MetricInfo> {
179 &self.metrics
180 }
181}
182
183#[derive(Clone, Serialize, Deserialize, Debug)]
185pub struct MetricInfo {
186 labels: BTreeMap<String, String>,
187}
188
189impl MetricInfo {
190 pub fn labels(&self) -> &BTreeMap<String, String> {
192 &self.labels
193 }
194}
195
196#[derive(Clone, Serialize, Deserialize, Debug)]
198pub struct Snapshot {
199 t: Monotonic,
200 d: BTreeMap<String, f64>,
201}
202
203impl Snapshot {
204 pub fn ts(&self) -> Monotonic {
206 self.t
207 }
208 pub fn data(&self) -> &BTreeMap<String, f64> {
210 &self.d
211 }
212 pub fn data_mut(&mut self) -> &mut BTreeMap<String, f64> {
214 &mut self.d
215 }
216 pub fn take_data(&mut self) -> BTreeMap<String, f64> {
218 std::mem::take(&mut self.d)
219 }
220}
221
222pub struct ScopeBuilder {
224 addr: SocketAddr,
225 fallback: Option<Box<dyn Recorder + Send + Sync>>,
226}
227
228impl Default for ScopeBuilder {
229 fn default() -> Self {
230 Self::new()
231 }
232}
233
234impl ScopeBuilder {
235 pub fn new() -> Self {
237 Self {
238 addr: (std::net::Ipv4Addr::UNSPECIFIED, 5001).into(),
239 fallback: None,
240 }
241 }
242 pub fn with_addr<A: Into<SocketAddr>>(mut self, addr: A) -> Self {
244 self.addr = addr.into();
245 self
246 }
247 pub fn with_fallback(mut self, fallback: Box<dyn Recorder + Send + Sync>) -> Self {
249 self.fallback = Some(fallback);
250 self
251 }
252 pub fn build(self) -> ScopeRecorder {
254 ScopeRecorder::build(self.addr, self.fallback)
255 }
256 pub fn install(self) -> Result<(), Error> {
258 self.build().install()
259 }
260}
261
262#[derive(Clone)]
264pub struct ScopeRecorder {
265 inner: Arc<Inner>,
266 fallback: Arc<Option<Box<dyn Recorder + Send + Sync>>>,
267}
268
269impl ScopeRecorder {
270 fn build<A: Into<SocketAddr>>(
271 addr: A,
272 fallback: Option<Box<dyn Recorder + Send + Sync>>,
273 ) -> Self {
274 Self {
275 inner: Arc::new(Inner::new(addr.into())),
276 fallback: fallback.into(),
277 }
278 }
279 pub fn install(self) -> Result<(), Error> {
283 self.spawn_tasks()?;
284 metrics::set_global_recorder(self).map_err(Into::into)
285 }
286 fn spawn_tasks(&self) -> Result<(), std::io::Error> {
287 self.inner.spawn_server(self.inner.addr)?;
288 Ok(())
289 }
290}
291
292struct Inner {
293 registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
294 addr: SocketAddr,
295}
296
297impl Inner {
298 fn new(addr: SocketAddr) -> Self {
299 let registry = Registry::new(GenerationalStorage::new(AtomicStorage));
300 Self { registry, addr }
301 }
302 fn snapshot(&self, t: Monotonic) -> Snapshot {
303 let handles = self.registry.get_gauge_handles();
304 let mut map = BTreeMap::new();
305 for (key, gauge) in handles {
306 let name = key.name();
307 let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
308 map.insert(name[1..].to_string(), value);
309 }
310 Snapshot { t, d: map }
311 }
312 fn info(&self) -> Info {
313 let info = self
314 .registry
315 .get_gauge_handles()
316 .iter()
317 .map(|(key, _)| {
318 let labels = key
319 .labels()
320 .map(|label| (label.key().to_owned(), label.value().to_owned()));
321 (
322 key.name()[1..].to_string(),
323 MetricInfo {
324 labels: labels.collect(),
325 },
326 )
327 })
328 .collect();
329 Info { metrics: info }
330 }
331 fn spawn_server(self: &Arc<Self>, addr: SocketAddr) -> Result<(), std::io::Error> {
332 let listener = TcpListener::bind(addr)?;
333 let metrics_scope = self.clone();
334 thread::Builder::new()
335 .name(SERVER_THREAD_NAME.to_owned())
336 .spawn(move || {
337 while let Ok((stream, addr)) = listener.accept() {
338 info!(?addr, "client connected");
339 let metrics_scope = metrics_scope.clone();
340 thread::spawn(move || {
341 if let Err(error) = handle_client(stream, metrics_scope) {
342 error!(?addr, ?error, "client error, disconnected");
343 } else {
344 info!(?addr, "client disconnected");
345 }
346 });
347 }
348 })?;
349 Ok(())
350 }
351}
352fn handle_client(mut stream: TcpStream, metrics_scope: Arc<Inner>) -> Result<(), Error> {
353 stream.set_read_timeout(Some(CLIENT_CHAT_TIMEOUT))?;
354 stream.set_write_timeout(Some(CLIENT_CHAT_TIMEOUT))?;
355 stream.set_nodelay(true)?;
356 protocol::write_version(&mut stream)?;
357 let clients_settings = protocol::read_client_settings(&mut stream)?;
358 stream.set_read_timeout(None)?;
359 stream.set_write_timeout(None)?;
360 protocol::write_packet(&mut stream, &Packet::Info(metrics_scope.info()))?;
361 let mut last_info_sent = Monotonic::now();
362 let int_ns = u128::from(clients_settings.sampling_interval);
363 let start = Monotonic::now();
364 for _ in interval(Duration::from_nanos(clients_settings.sampling_interval)) {
365 let ts = Monotonic::from_nanos(
366 (start.elapsed().as_nanos() / int_ns * int_ns)
367 .try_into()
368 .unwrap(),
369 );
370 let packet = Packet::Snapshot(metrics_scope.snapshot(ts));
371 if protocol::write_packet(&mut stream, &packet).is_err() {
372 break;
373 }
374 if last_info_sent.elapsed() >= SEND_INFO_INTERVAL {
375 let packet = Packet::Info(metrics_scope.info());
376 if protocol::write_packet(&mut stream, &packet).is_err() {
377 break;
378 }
379 last_info_sent = Monotonic::now();
380 }
381 }
382 Ok(())
383}
384
385impl Recorder for ScopeRecorder {
386 fn describe_counter(
387 &self,
388 key: metrics::KeyName,
389 unit: Option<metrics::Unit>,
390 description: metrics::SharedString,
391 ) {
392 if let Some(fallback) = self.fallback.as_ref() {
393 fallback.describe_counter(key, unit, description);
394 }
395 }
396
397 fn describe_gauge(
398 &self,
399 key: metrics::KeyName,
400 unit: Option<metrics::Unit>,
401 description: metrics::SharedString,
402 ) {
403 if let Some(fallback) = self.fallback.as_ref() {
404 fallback.describe_gauge(key, unit, description);
405 }
406 }
407
408 fn describe_histogram(
409 &self,
410 key: metrics::KeyName,
411 unit: Option<metrics::Unit>,
412 description: metrics::SharedString,
413 ) {
414 if let Some(fallback) = self.fallback.as_ref() {
415 fallback.describe_histogram(key, unit, description);
416 }
417 }
418
419 fn register_counter(
420 &self,
421 key: &metrics::Key,
422 metadata: &metrics::Metadata<'_>,
423 ) -> metrics::Counter {
424 if let Some(fallback) = self.fallback.as_ref() {
425 fallback.register_counter(key, metadata)
426 } else {
427 metrics::Counter::noop()
428 }
429 }
430
431 fn register_gauge(
432 &self,
433 key: &metrics::Key,
434 metadata: &metrics::Metadata<'_>,
435 ) -> metrics::Gauge {
436 if key.name().starts_with('~') {
437 self.inner
438 .registry
439 .get_or_create_gauge(key, |c| c.clone().into())
440 } else if let Some(fallback) = self.fallback.as_ref() {
441 fallback.register_gauge(key, metadata)
442 } else {
443 metrics::Gauge::noop()
444 }
445 }
446
447 fn register_histogram(
448 &self,
449 key: &metrics::Key,
450 metadata: &metrics::Metadata<'_>,
451 ) -> metrics::Histogram {
452 if let Some(fallback) = self.fallback.as_ref() {
453 fallback.register_histogram(key, metadata)
454 } else {
455 metrics::Histogram::noop()
456 }
457 }
458}