limen_core/telemetry/
concurrent.rs1use crate::prelude::{Telemetry, TelemetryEvent, TelemetryKey};
4
5use std::sync::mpsc;
6use std::sync::{
7 atomic::{AtomicBool, Ordering},
8 Arc,
9};
10
11enum TelemetryMsg {
14 IncrCounter(TelemetryKey, u64),
15 SetGauge(TelemetryKey, u64),
16 RecordLatency(TelemetryKey, u64),
17 PushMetrics,
18 PushEvent(TelemetryEvent),
19 Flush,
20 Shutdown,
21}
22
23pub struct TelemetryCore<T: Telemetry> {
28 inner: T,
29 rx: mpsc::Receiver<TelemetryMsg>,
30 events_flag: Arc<AtomicBool>,
31}
32
33impl<T: Telemetry> TelemetryCore<T> {
34 fn new(inner: T, rx: mpsc::Receiver<TelemetryMsg>) -> (Self, Arc<AtomicBool>) {
39 let events_flag = Arc::new(AtomicBool::new(inner.events_enabled()));
40 (
41 Self {
42 inner,
43 rx,
44 events_flag: events_flag.clone(),
45 },
46 events_flag,
47 )
48 }
49
50 pub fn run(mut self) {
53 while let Ok(message) = self.rx.recv() {
54 let shutdown_requested = match message {
55 TelemetryMsg::IncrCounter(telemetry_key, counter_delta) if T::METRICS_ENABLED => {
56 self.inner.incr_counter(telemetry_key, counter_delta);
57 false
58 }
59 TelemetryMsg::SetGauge(telemetry_key, gauge_value) if T::METRICS_ENABLED => {
60 self.inner.set_gauge(telemetry_key, gauge_value);
61 false
62 }
63 TelemetryMsg::RecordLatency(telemetry_key, latency_value_ns)
64 if T::METRICS_ENABLED =>
65 {
66 self.inner
67 .record_latency_ns(telemetry_key, latency_value_ns);
68 false
69 }
70 TelemetryMsg::PushMetrics => {
71 self.inner.push_metrics();
72 false
73 }
74 TelemetryMsg::PushEvent(telemetry_event) if T::EVENTS_STATICALLY_ENABLED => {
75 self.inner.push_event(telemetry_event);
76 false
77 }
78 TelemetryMsg::Flush => {
79 self.inner.flush();
80 false
81 }
82 TelemetryMsg::Shutdown => true,
83 _ => false,
84 };
85
86 self.events_flag
87 .store(self.inner.events_enabled(), Ordering::Relaxed);
88
89 if shutdown_requested {
90 while let Ok(queued_message) = self.rx.try_recv() {
91 match queued_message {
92 TelemetryMsg::IncrCounter(telemetry_key, counter_delta)
93 if T::METRICS_ENABLED =>
94 {
95 self.inner.incr_counter(telemetry_key, counter_delta);
96 }
97 TelemetryMsg::SetGauge(telemetry_key, gauge_value)
98 if T::METRICS_ENABLED =>
99 {
100 self.inner.set_gauge(telemetry_key, gauge_value);
101 }
102 TelemetryMsg::RecordLatency(telemetry_key, latency_value_ns)
103 if T::METRICS_ENABLED =>
104 {
105 self.inner
106 .record_latency_ns(telemetry_key, latency_value_ns);
107 }
108 TelemetryMsg::PushMetrics => {
109 self.inner.push_metrics();
110 }
111 TelemetryMsg::PushEvent(telemetry_event)
112 if T::EVENTS_STATICALLY_ENABLED =>
113 {
114 self.inner.push_event(telemetry_event);
115 }
116 TelemetryMsg::Flush => {
117 self.inner.flush();
118 }
119 TelemetryMsg::Shutdown => {}
120 _ => {}
121 }
122
123 self.events_flag
124 .store(self.inner.events_enabled(), Ordering::Relaxed);
125 }
126
127 self.inner.flush();
128 return;
129 }
130 }
131
132 self.inner.flush();
133 }
134}
135
136pub struct TelemetrySender<T: Telemetry> {
139 tx: mpsc::Sender<TelemetryMsg>,
140 events_flag: Arc<AtomicBool>,
141 _marker: core::marker::PhantomData<T>,
142}
143
144impl<T: Telemetry> TelemetrySender<T> {
145 pub fn send_shutdown(&self) {
150 let _ = self.tx.send(TelemetryMsg::Shutdown);
151 }
152}
153
154impl<T: Telemetry> Clone for TelemetrySender<T> {
155 fn clone(&self) -> Self {
156 Self {
157 tx: self.tx.clone(),
158 events_flag: self.events_flag.clone(),
159 _marker: core::marker::PhantomData,
160 }
161 }
162}
163
164impl<T: Telemetry> Telemetry for TelemetrySender<T> {
165 const METRICS_ENABLED: bool = T::METRICS_ENABLED;
166 const EVENTS_STATICALLY_ENABLED: bool = T::EVENTS_STATICALLY_ENABLED;
167
168 #[inline]
169 fn incr_counter(&mut self, key: TelemetryKey, delta: u64) {
170 if !Self::METRICS_ENABLED {
171 return;
172 }
173 let _ = self.tx.send(TelemetryMsg::IncrCounter(key, delta));
175 }
176
177 #[inline]
178 fn set_gauge(&mut self, key: TelemetryKey, value: u64) {
179 if !Self::METRICS_ENABLED {
180 return;
181 }
182 let _ = self.tx.send(TelemetryMsg::SetGauge(key, value));
183 }
184
185 #[inline]
186 fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64) {
187 if !Self::METRICS_ENABLED {
188 return;
189 }
190 let _ = self.tx.send(TelemetryMsg::RecordLatency(key, value_ns));
191 }
192
193 #[inline]
194 fn push_metrics(&mut self) {
195 let _ = self.tx.send(TelemetryMsg::PushMetrics);
196 }
197
198 #[inline]
199 fn events_enabled(&self) -> bool {
200 Self::EVENTS_STATICALLY_ENABLED && self.events_flag.load(Ordering::Relaxed)
201 }
202
203 #[inline]
204 fn push_event(&mut self, event: TelemetryEvent) {
205 if !self.events_enabled() {
206 return;
207 }
208 let _ = self.tx.send(TelemetryMsg::PushEvent(event));
209 }
210
211 #[inline]
212 fn flush(&mut self) {
213 let _ = self.tx.send(TelemetryMsg::Flush);
214 }
215}
216
217pub struct TelemetryCoreHandle<T: Telemetry> {
220 sender: TelemetrySender<T>,
221 join_handle: Option<std::thread::JoinHandle<()>>,
222}
223
224pub fn new_telemetry_pair<T: Telemetry>(inner: T) -> (TelemetryCore<T>, TelemetrySender<T>) {
230 let (tx, rx) = mpsc::channel::<TelemetryMsg>();
231 let (core, events_flag) = TelemetryCore::new(inner, rx);
232 let sender = TelemetrySender {
233 tx,
234 events_flag,
235 _marker: core::marker::PhantomData,
236 };
237 (core, sender)
238}
239
240pub fn spawn_telemetry_core<T>(inner: T) -> TelemetryCoreHandle<T>
243where
244 T: Telemetry + Send + 'static,
245{
246 let (core, sender) = new_telemetry_pair(inner);
247 let join_handle = std::thread::spawn(move || core.run());
248 TelemetryCoreHandle {
249 sender,
250 join_handle: Some(join_handle),
251 }
252}
253
254impl<T: Telemetry> TelemetryCoreHandle<T> {
255 #[inline]
257 pub fn sender(&self) -> TelemetrySender<T> {
258 self.sender.clone()
259 }
260
261 pub fn shutdown_and_join(mut self) {
266 self.sender.send_shutdown();
267 if let Some(handle) = self.join_handle.take() {
268 let _ = handle.join();
269 }
270 }
271}