1pub use metrics;
40
41mod cmd;
42mod handles;
43mod state;
44
45use cmd::*;
46use handles::*;
47use state::*;
48
49use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
50use std::sync::Arc;
51use std::sync::mpsc::{self, Receiver, Sender};
52use std::time::{Duration, Instant};
53
54pub struct MetricsLogger<F> {
55 tx: Sender<MetricsCmd>,
56 err_cb: F,
57}
58
59pub enum LogMode {
60 Immediate,
62 Periodic(u64),
64}
65
66impl<F> MetricsLogger<F>
67where
68 F: Fn(&str) + Copy + Send + Sync + 'static,
69{
70 pub fn new<F2>(mode: LogMode, log_cb: F2, err_cb: F) -> Self
71 where
72 F2: Fn(&str) + Copy + Send + Sync + 'static,
73 {
74 let (tx, rx) = mpsc::channel();
75 match mode {
76 LogMode::Immediate => Self::launch_immediate_mode(rx, log_cb),
77 LogMode::Periodic(log_interval_secs) => {
78 Self::launch_periodic_mode(rx, log_cb, log_interval_secs)
79 }
80 }
81 Self { tx, err_cb }
82 }
83
84 fn launch_immediate_mode<F2>(rx: Receiver<MetricsCmd>, log_cb: F2)
85 where
86 F2: Fn(&str) + Copy + Send + Sync + 'static,
87 {
88 std::thread::spawn(move || {
89 let mut state = MetricsState::new();
90 for cmd in rx.iter() {
91 state.update(cmd);
92 if let Some(logs) = state.output_logs() {
93 (log_cb)(&logs);
94 }
95 }
96 });
97 }
98
99 fn launch_periodic_mode<F2>(rx: Receiver<MetricsCmd>, log_cb: F2, log_interval_secs: u64)
100 where
101 F2: Fn(&str) + Copy + Send + Sync + 'static,
102 {
103 std::thread::spawn(move || {
104 let mut state = MetricsState::new();
105 let interval = Duration::from_secs(log_interval_secs);
106 let mut next_log_time = Instant::now() + interval;
107 loop {
108 match rx.recv_timeout(Duration::from_secs(log_interval_secs)) {
109 Ok(cmd) => {
110 state.update(cmd);
111 }
112 Err(mpsc::RecvTimeoutError::Timeout) => {}
113 Err(mpsc::RecvTimeoutError::Disconnected) => break,
114 }
115
116 let now = Instant::now();
117 if now >= next_log_time {
118 if let Some(logs) = state.output_logs() {
119 (log_cb)(&logs);
120 }
121 next_log_time = now + interval;
122 }
123 }
124 });
125 }
126}
127
128impl<F> Recorder for MetricsLogger<F>
129where
130 F: Fn(&str) + Copy + Send + Sync + 'static,
131{
132 fn describe_counter(&self, _name: KeyName, _unit: Option<Unit>, _description: SharedString) {}
133
134 fn describe_gauge(&self, _name: KeyName, _unit: Option<Unit>, _description: SharedString) {}
135
136 fn describe_histogram(&self, _name: KeyName, _unit: Option<Unit>, _description: SharedString) {}
137
138 fn register_counter(&self, key: &Key, _meta: &Metadata<'_>) -> Counter {
139 let name = key.name().to_string();
140 let handle = CounterHandle {
141 name,
142 tx: self.tx.clone(),
143 err_cb: self.err_cb,
144 };
145 Counter::from_arc(Arc::new(handle))
146 }
147
148 fn register_gauge(&self, key: &Key, _meta: &Metadata<'_>) -> Gauge {
149 let name = key.name().to_string();
150
151 let handle = GaugeHandle {
152 name,
153 tx: self.tx.clone(),
154 err_cb: self.err_cb,
155 };
156 Gauge::from_arc(Arc::new(handle))
157 }
158
159 fn register_histogram(&self, key: &Key, _meta: &Metadata<'_>) -> Histogram {
160 let name = key.name().to_string();
161 let handle = HistogramHandle {
162 name,
163 tx: self.tx.clone(),
164 err_cb: self.err_cb,
165 };
166 Histogram::from_arc(Arc::new(handle))
167 }
168}