Skip to main content

limen_core/telemetry/
concurrent.rs

1//! Concurrent enabled wrapper for telemetry Implementations.
2
3use crate::prelude::{Telemetry, TelemetryEvent, TelemetryKey};
4
5use std::sync::mpsc;
6use std::sync::{
7    atomic::{AtomicBool, Ordering},
8    Arc,
9};
10
11/// Internal message type sent from `TelemetrySender` instances to the
12/// single-threaded `TelemetryCore`.
13enum TelemetryMsg {
14    IncrCounter(TelemetryKey, u64),
15    SetGauge(TelemetryKey, u64),
16    RecordLatency(TelemetryKey, u64),
17    PushMetrics,
18    PushEvent(TelemetryEvent),
19    Flush,
20    Shutdown,
21}
22
23/// Single-threaded core that owns the underlying telemetry implementation `T`.
24///
25/// All metric and event updates are serialized through this core, which runs
26/// in a dedicated thread or is driven by the host runtime.
27pub struct TelemetryCore<T: Telemetry> {
28    inner: T,
29    rx: mpsc::Receiver<TelemetryMsg>,
30    events_flag: Arc<AtomicBool>,
31}
32
33impl<T: Telemetry> TelemetryCore<T> {
34    /// Construct a new `TelemetryCore` around `inner`, using the given receiver.
35    ///
36    /// Returns the core and an `Arc<AtomicBool>` that mirrors
37    /// `inner.events_enabled()` for cheap access on the sender side.
38    fn new(inner: T, rx: mpsc::Receiver<TelemetryMsg>) -> (Self, Arc<AtomicBool>) {
39        let events_flag = Arc::new(AtomicBool::new(inner.events_enabled()));
40        (
41            Self {
42                inner,
43                rx,
44                events_flag: events_flag.clone(),
45            },
46            events_flag,
47        )
48    }
49
50    /// Drive the telemetry core loop until all senders are dropped or a
51    /// `Shutdown` message is received.
52    pub fn run(mut self) {
53        while let Ok(message) = self.rx.recv() {
54            let shutdown_requested = match message {
55                TelemetryMsg::IncrCounter(telemetry_key, counter_delta) if T::METRICS_ENABLED => {
56                    self.inner.incr_counter(telemetry_key, counter_delta);
57                    false
58                }
59                TelemetryMsg::SetGauge(telemetry_key, gauge_value) if T::METRICS_ENABLED => {
60                    self.inner.set_gauge(telemetry_key, gauge_value);
61                    false
62                }
63                TelemetryMsg::RecordLatency(telemetry_key, latency_value_ns)
64                    if T::METRICS_ENABLED =>
65                {
66                    self.inner
67                        .record_latency_ns(telemetry_key, latency_value_ns);
68                    false
69                }
70                TelemetryMsg::PushMetrics => {
71                    self.inner.push_metrics();
72                    false
73                }
74                TelemetryMsg::PushEvent(telemetry_event) if T::EVENTS_STATICALLY_ENABLED => {
75                    self.inner.push_event(telemetry_event);
76                    false
77                }
78                TelemetryMsg::Flush => {
79                    self.inner.flush();
80                    false
81                }
82                TelemetryMsg::Shutdown => true,
83                _ => false,
84            };
85
86            self.events_flag
87                .store(self.inner.events_enabled(), Ordering::Relaxed);
88
89            if shutdown_requested {
90                while let Ok(queued_message) = self.rx.try_recv() {
91                    match queued_message {
92                        TelemetryMsg::IncrCounter(telemetry_key, counter_delta)
93                            if T::METRICS_ENABLED =>
94                        {
95                            self.inner.incr_counter(telemetry_key, counter_delta);
96                        }
97                        TelemetryMsg::SetGauge(telemetry_key, gauge_value)
98                            if T::METRICS_ENABLED =>
99                        {
100                            self.inner.set_gauge(telemetry_key, gauge_value);
101                        }
102                        TelemetryMsg::RecordLatency(telemetry_key, latency_value_ns)
103                            if T::METRICS_ENABLED =>
104                        {
105                            self.inner
106                                .record_latency_ns(telemetry_key, latency_value_ns);
107                        }
108                        TelemetryMsg::PushMetrics => {
109                            self.inner.push_metrics();
110                        }
111                        TelemetryMsg::PushEvent(telemetry_event)
112                            if T::EVENTS_STATICALLY_ENABLED =>
113                        {
114                            self.inner.push_event(telemetry_event);
115                        }
116                        TelemetryMsg::Flush => {
117                            self.inner.flush();
118                        }
119                        TelemetryMsg::Shutdown => {}
120                        _ => {}
121                    }
122
123                    self.events_flag
124                        .store(self.inner.events_enabled(), Ordering::Relaxed);
125                }
126
127                self.inner.flush();
128                return;
129            }
130        }
131
132        self.inner.flush();
133    }
134}
135
136/// Sender-side handle that implements `Telemetry` and forwards all calls
137/// to the core via a multi-producer, single-consumer channel.
138pub struct TelemetrySender<T: Telemetry> {
139    tx: mpsc::Sender<TelemetryMsg>,
140    events_flag: Arc<AtomicBool>,
141    _marker: core::marker::PhantomData<T>,
142}
143
144impl<T: Telemetry> TelemetrySender<T> {
145    /// Send an explicit shutdown message to the telemetry core.
146    ///
147    /// After this call, the core will exit its run loop once it processes
148    /// the message and perform a final flush.
149    pub fn send_shutdown(&self) {
150        let _ = self.tx.send(TelemetryMsg::Shutdown);
151    }
152}
153
154impl<T: Telemetry> Clone for TelemetrySender<T> {
155    fn clone(&self) -> Self {
156        Self {
157            tx: self.tx.clone(),
158            events_flag: self.events_flag.clone(),
159            _marker: core::marker::PhantomData,
160        }
161    }
162}
163
164impl<T: Telemetry> Telemetry for TelemetrySender<T> {
165    const METRICS_ENABLED: bool = T::METRICS_ENABLED;
166    const EVENTS_STATICALLY_ENABLED: bool = T::EVENTS_STATICALLY_ENABLED;
167
168    #[inline]
169    fn incr_counter(&mut self, key: TelemetryKey, delta: u64) {
170        if !Self::METRICS_ENABLED {
171            return;
172        }
173        // For very hot paths consider mpsc::Sender::send vs try_send on a bounded channel.
174        let _ = self.tx.send(TelemetryMsg::IncrCounter(key, delta));
175    }
176
177    #[inline]
178    fn set_gauge(&mut self, key: TelemetryKey, value: u64) {
179        if !Self::METRICS_ENABLED {
180            return;
181        }
182        let _ = self.tx.send(TelemetryMsg::SetGauge(key, value));
183    }
184
185    #[inline]
186    fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64) {
187        if !Self::METRICS_ENABLED {
188            return;
189        }
190        let _ = self.tx.send(TelemetryMsg::RecordLatency(key, value_ns));
191    }
192
193    #[inline]
194    fn push_metrics(&mut self) {
195        let _ = self.tx.send(TelemetryMsg::PushMetrics);
196    }
197
198    #[inline]
199    fn events_enabled(&self) -> bool {
200        Self::EVENTS_STATICALLY_ENABLED && self.events_flag.load(Ordering::Relaxed)
201    }
202
203    #[inline]
204    fn push_event(&mut self, event: TelemetryEvent) {
205        if !self.events_enabled() {
206            return;
207        }
208        let _ = self.tx.send(TelemetryMsg::PushEvent(event));
209    }
210
211    #[inline]
212    fn flush(&mut self) {
213        let _ = self.tx.send(TelemetryMsg::Flush);
214    }
215}
216
217/// Handle returned by `spawn_telemetry_core`, owning the core thread and
218/// providing a clonable sender for worker threads.
219pub struct TelemetryCoreHandle<T: Telemetry> {
220    sender: TelemetrySender<T>,
221    join_handle: Option<std::thread::JoinHandle<()>>,
222}
223
224/// Construct a telemetry core and its corresponding sender without spawning
225/// a dedicated thread.
226///
227/// This is useful when the host runtime wants to own the core and drive
228/// `TelemetryCore::run()` or a custom loop itself.
229pub fn new_telemetry_pair<T: Telemetry>(inner: T) -> (TelemetryCore<T>, TelemetrySender<T>) {
230    let (tx, rx) = mpsc::channel::<TelemetryMsg>();
231    let (core, events_flag) = TelemetryCore::new(inner, rx);
232    let sender = TelemetrySender {
233        tx,
234        events_flag,
235        _marker: core::marker::PhantomData,
236    };
237    (core, sender)
238}
239
240/// Spawn a dedicated thread that owns `inner` and processes telemetry messages,
241/// returning a handle that can be used to obtain senders and shut the core down.
242pub fn spawn_telemetry_core<T>(inner: T) -> TelemetryCoreHandle<T>
243where
244    T: Telemetry + Send + 'static,
245{
246    let (core, sender) = new_telemetry_pair(inner);
247    let join_handle = std::thread::spawn(move || core.run());
248    TelemetryCoreHandle {
249        sender,
250        join_handle: Some(join_handle),
251    }
252}
253
254impl<T: Telemetry> TelemetryCoreHandle<T> {
255    /// Return a clonable `TelemetrySender<T>` that can be passed to nodes.
256    #[inline]
257    pub fn sender(&self) -> TelemetrySender<T> {
258        self.sender.clone()
259    }
260
261    /// Send a shutdown signal to the core and join the thread.
262    ///
263    /// This should be called during runtime shutdown to ensure that all
264    /// telemetry is flushed before exit.
265    pub fn shutdown_and_join(mut self) {
266        self.sender.send_shutdown();
267        if let Some(handle) = self.join_handle.take() {
268            let _ = handle.join();
269        }
270    }
271}