use crate::prelude::{Telemetry, TelemetryEvent, TelemetryKey};
use std::sync::mpsc;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
enum TelemetryMsg {
IncrCounter(TelemetryKey, u64),
SetGauge(TelemetryKey, u64),
RecordLatency(TelemetryKey, u64),
PushMetrics,
PushEvent(TelemetryEvent),
Flush,
Shutdown,
}
pub struct TelemetryCore<T: Telemetry> {
inner: T,
rx: mpsc::Receiver<TelemetryMsg>,
events_flag: Arc<AtomicBool>,
}
impl<T: Telemetry> TelemetryCore<T> {
fn new(inner: T, rx: mpsc::Receiver<TelemetryMsg>) -> (Self, Arc<AtomicBool>) {
let events_flag = Arc::new(AtomicBool::new(inner.events_enabled()));
(
Self {
inner,
rx,
events_flag: events_flag.clone(),
},
events_flag,
)
}
pub fn run(mut self) {
while let Ok(message) = self.rx.recv() {
let shutdown_requested = match message {
TelemetryMsg::IncrCounter(telemetry_key, counter_delta) if T::METRICS_ENABLED => {
self.inner.incr_counter(telemetry_key, counter_delta);
false
}
TelemetryMsg::SetGauge(telemetry_key, gauge_value) if T::METRICS_ENABLED => {
self.inner.set_gauge(telemetry_key, gauge_value);
false
}
TelemetryMsg::RecordLatency(telemetry_key, latency_value_ns)
if T::METRICS_ENABLED =>
{
self.inner
.record_latency_ns(telemetry_key, latency_value_ns);
false
}
TelemetryMsg::PushMetrics => {
self.inner.push_metrics();
false
}
TelemetryMsg::PushEvent(telemetry_event) if T::EVENTS_STATICALLY_ENABLED => {
self.inner.push_event(telemetry_event);
false
}
TelemetryMsg::Flush => {
self.inner.flush();
false
}
TelemetryMsg::Shutdown => true,
_ => false,
};
self.events_flag
.store(self.inner.events_enabled(), Ordering::Relaxed);
if shutdown_requested {
while let Ok(queued_message) = self.rx.try_recv() {
match queued_message {
TelemetryMsg::IncrCounter(telemetry_key, counter_delta)
if T::METRICS_ENABLED =>
{
self.inner.incr_counter(telemetry_key, counter_delta);
}
TelemetryMsg::SetGauge(telemetry_key, gauge_value)
if T::METRICS_ENABLED =>
{
self.inner.set_gauge(telemetry_key, gauge_value);
}
TelemetryMsg::RecordLatency(telemetry_key, latency_value_ns)
if T::METRICS_ENABLED =>
{
self.inner
.record_latency_ns(telemetry_key, latency_value_ns);
}
TelemetryMsg::PushMetrics => {
self.inner.push_metrics();
}
TelemetryMsg::PushEvent(telemetry_event)
if T::EVENTS_STATICALLY_ENABLED =>
{
self.inner.push_event(telemetry_event);
}
TelemetryMsg::Flush => {
self.inner.flush();
}
TelemetryMsg::Shutdown => {}
_ => {}
}
self.events_flag
.store(self.inner.events_enabled(), Ordering::Relaxed);
}
self.inner.flush();
return;
}
}
self.inner.flush();
}
}
pub struct TelemetrySender<T: Telemetry> {
tx: mpsc::Sender<TelemetryMsg>,
events_flag: Arc<AtomicBool>,
_marker: core::marker::PhantomData<T>,
}
impl<T: Telemetry> TelemetrySender<T> {
pub fn send_shutdown(&self) {
let _ = self.tx.send(TelemetryMsg::Shutdown);
}
}
impl<T: Telemetry> Clone for TelemetrySender<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
events_flag: self.events_flag.clone(),
_marker: core::marker::PhantomData,
}
}
}
impl<T: Telemetry> Telemetry for TelemetrySender<T> {
const METRICS_ENABLED: bool = T::METRICS_ENABLED;
const EVENTS_STATICALLY_ENABLED: bool = T::EVENTS_STATICALLY_ENABLED;
#[inline]
fn incr_counter(&mut self, key: TelemetryKey, delta: u64) {
if !Self::METRICS_ENABLED {
return;
}
let _ = self.tx.send(TelemetryMsg::IncrCounter(key, delta));
}
#[inline]
fn set_gauge(&mut self, key: TelemetryKey, value: u64) {
if !Self::METRICS_ENABLED {
return;
}
let _ = self.tx.send(TelemetryMsg::SetGauge(key, value));
}
#[inline]
fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64) {
if !Self::METRICS_ENABLED {
return;
}
let _ = self.tx.send(TelemetryMsg::RecordLatency(key, value_ns));
}
#[inline]
fn push_metrics(&mut self) {
let _ = self.tx.send(TelemetryMsg::PushMetrics);
}
#[inline]
fn events_enabled(&self) -> bool {
Self::EVENTS_STATICALLY_ENABLED && self.events_flag.load(Ordering::Relaxed)
}
#[inline]
fn push_event(&mut self, event: TelemetryEvent) {
if !self.events_enabled() {
return;
}
let _ = self.tx.send(TelemetryMsg::PushEvent(event));
}
#[inline]
fn flush(&mut self) {
let _ = self.tx.send(TelemetryMsg::Flush);
}
}
pub struct TelemetryCoreHandle<T: Telemetry> {
sender: TelemetrySender<T>,
join_handle: Option<std::thread::JoinHandle<()>>,
}
pub fn new_telemetry_pair<T: Telemetry>(inner: T) -> (TelemetryCore<T>, TelemetrySender<T>) {
let (tx, rx) = mpsc::channel::<TelemetryMsg>();
let (core, events_flag) = TelemetryCore::new(inner, rx);
let sender = TelemetrySender {
tx,
events_flag,
_marker: core::marker::PhantomData,
};
(core, sender)
}
pub fn spawn_telemetry_core<T>(inner: T) -> TelemetryCoreHandle<T>
where
T: Telemetry + Send + 'static,
{
let (core, sender) = new_telemetry_pair(inner);
let join_handle = std::thread::spawn(move || core.run());
TelemetryCoreHandle {
sender,
join_handle: Some(join_handle),
}
}
impl<T: Telemetry> TelemetryCoreHandle<T> {
#[inline]
pub fn sender(&self) -> TelemetrySender<T> {
self.sender.clone()
}
pub fn shutdown_and_join(mut self) {
self.sender.send_shutdown();
if let Some(handle) = self.join_handle.take() {
let _ = handle.join();
}
}
}