use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::Interval;
use tracing::info;
use crate::payload::{ClientInformation, EventWithTimestamp, TelemetryEvent, TelemetryPayload};
use crate::sink::{HttpClient, Sink};
const TELEMETRY_PUSH_COOLDOWN: Duration = Duration::from_secs(60);
const LAST_REQUEST_TIMEOUT: Duration = Duration::from_secs(1);
const MAX_NUM_EVENTS_IN_QUEUE: usize = 10;
#[cfg(test)]
struct ClockButton(Sender<()>);
#[cfg(test)]
impl ClockButton {
async fn tick(&self) {
let _ = self.0.send(()).await;
}
}
enum Clock {
Periodical(Mutex<Interval>),
#[cfg(test)]
Manual(Mutex<Receiver<()>>),
}
impl Clock {
pub fn periodical(period: Duration) -> Clock {
let interval = tokio::time::interval(period);
Clock::Periodical(Mutex::new(interval))
}
#[cfg(test)]
pub async fn manual() -> (ClockButton, Clock) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let _ = tx.send(()).await;
let button = ClockButton(tx);
(button, Clock::Manual(Mutex::new(rx)))
}
async fn tick(&self) {
match self {
Clock::Periodical(interval) => {
interval.lock().await.tick().await;
}
#[cfg(test)]
Clock::Manual(channel) => {
channel.lock().await.recv().await;
}
}
}
}
#[derive(Default)]
struct EventsState {
events: Vec<EventWithTimestamp>,
num_dropped_events: usize,
}
impl EventsState {
fn drain_events(&mut self) -> EventsState {
mem::replace(
self,
EventsState {
events: Vec::new(),
num_dropped_events: 0,
},
)
}
fn push_event(&mut self, event: TelemetryEvent) -> bool {
if self.events.len() >= MAX_NUM_EVENTS_IN_QUEUE {
self.num_dropped_events += 1;
return false;
}
let events_was_empty = self.events.is_empty();
self.events.push(EventWithTimestamp::from(event));
events_was_empty
}
}
struct Events {
state: RwLock<EventsState>,
items_available_tx: Sender<()>,
items_available_rx: RwLock<Receiver<()>>,
}
impl Default for Events {
fn default() -> Self {
let (items_available_tx, items_available_rx) = tokio::sync::mpsc::channel(1);
Events {
state: RwLock::new(EventsState::default()),
items_available_tx,
items_available_rx: RwLock::new(items_available_rx),
}
}
}
impl Events {
async fn drain_events(&self) -> EventsState {
self.items_available_rx.write().await.recv().await;
self.state.write().await.drain_events()
}
async fn push_event(&self, event: TelemetryEvent) {
let is_first_event = self.state.write().await.push_event(event);
if is_first_event {
let _ = self.items_available_tx.send(()).await;
}
}
}
pub(crate) struct Inner {
sink: Option<Box<dyn Sink>>,
client_information: ClientInformation,
events: Events,
clock: Clock,
is_started: AtomicBool,
}
impl Inner {
pub fn is_disabled(&self) -> bool {
self.sink.is_none()
}
async fn create_telemetry_payload(&self) -> TelemetryPayload {
let events_state = self.events.drain_events().await;
TelemetryPayload {
client_information: self.client_information.clone(),
events: events_state.events,
num_dropped_events: events_state.num_dropped_events,
}
}
async fn send_pending_events(&self) {
if let Some(sink) = self.sink.as_ref() {
let payload = self.create_telemetry_payload().await;
sink.send_payload(payload).await;
}
}
async fn send(&self, event: TelemetryEvent) {
if self.is_disabled() {
return;
}
self.events.push_event(event).await;
}
}
pub struct TelemetrySender {
pub(crate) inner: Arc<Inner>,
}
pub enum TelemetryLoopHandle {
NoLoop,
WithLoop {
join_handle: JoinHandle<()>,
terminate_command_tx: oneshot::Sender<()>,
},
}
impl TelemetryLoopHandle {
pub async fn terminate_telemetry(self) {
if let Self::WithLoop {
join_handle,
terminate_command_tx,
} = self
{
let _ = terminate_command_tx.send(());
let _ = tokio::time::timeout(LAST_REQUEST_TIMEOUT, join_handle).await;
}
}
}
impl TelemetrySender {
fn new<S: Sink>(sink_opt: Option<S>, clock: Clock) -> TelemetrySender {
let sink_opt: Option<Box<dyn Sink>> = if let Some(sink) = sink_opt {
Some(Box::new(sink))
} else {
None
};
TelemetrySender {
inner: Arc::new(Inner {
sink: sink_opt,
client_information: ClientInformation::default(),
events: Events::default(),
clock,
is_started: AtomicBool::new(false),
}),
}
}
pub fn start_loop(&self) -> TelemetryLoopHandle {
let (terminate_command_tx, mut terminate_command_rx) = oneshot::channel();
if self.inner.is_disabled() {
return TelemetryLoopHandle::NoLoop;
}
assert!(
self.inner
.is_started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok(),
"The telemetry loop is already started."
);
let inner = self.inner.clone();
let join_handle = tokio::task::spawn(async move {
loop {
let quit_loop = tokio::select! {
_ = (&mut terminate_command_rx) => { true }
_ = inner.clock.tick() => { false }
};
let _ = inner.send_pending_events().await;
if quit_loop {
break;
}
}
});
TelemetryLoopHandle::WithLoop {
join_handle,
terminate_command_tx,
}
}
pub async fn send(&self, event: TelemetryEvent) {
self.inner.send(event).await;
}
}
pub fn is_telemetry_enabled() -> bool {
std::env::var_os(crate::DISABLE_TELEMETRY_ENV_KEY).is_none()
}
fn create_http_client() -> Option<HttpClient> {
let client_opt = if is_telemetry_enabled() {
HttpClient::try_new()
} else {
None
};
if let Some(client) = client_opt.as_ref() {
info!("telemetry to {} is enabled.", client.endpoint());
} else {
info!("telemetry to quickwit is disabled.");
}
client_opt
}
impl Default for TelemetrySender {
fn default() -> Self {
let http_client = create_http_client();
TelemetrySender::new(http_client, Clock::periodical(TELEMETRY_PUSH_COOLDOWN))
}
}
#[cfg(test)]
mod tests {
use std::env;
use super::*;
#[ignore]
#[tokio::test]
async fn test_enabling_and_disabling_telemetry() {
env::set_var(crate::DISABLE_TELEMETRY_ENV_KEY, "");
assert_eq!(TelemetrySender::default().inner.is_disabled(), true);
env::remove_var(crate::DISABLE_TELEMETRY_ENV_KEY);
assert_eq!(TelemetrySender::default().inner.is_disabled(), false);
}
#[tokio::test]
async fn test_telemetry_no_wait_for_first_event() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (_clock_btn, clock) = Clock::manual().await;
let telemetry_sender = TelemetrySender::new(Some(tx), clock);
let loop_handler = telemetry_sender.start_loop();
telemetry_sender.send(TelemetryEvent::Create).await;
let payload_opt = rx.recv().await;
assert!(payload_opt.is_some());
let payload = payload_opt.unwrap();
assert_eq!(payload.events.len(), 1);
loop_handler.terminate_telemetry().await;
}
#[tokio::test]
async fn test_telemetry_two_events() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (clock_btn, clock) = Clock::manual().await;
let telemetry_sender = TelemetrySender::new(Some(tx), clock);
let loop_handler = telemetry_sender.start_loop();
telemetry_sender.send(TelemetryEvent::Create).await;
{
let payload = rx.recv().await.unwrap();
assert_eq!(payload.events.len(), 1);
}
clock_btn.tick().await;
telemetry_sender.send(TelemetryEvent::Create).await;
{
let payload = rx.recv().await.unwrap();
assert_eq!(payload.events.len(), 1);
}
loop_handler.terminate_telemetry().await;
}
#[tokio::test]
async fn test_telemetry_cooldown_observed() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (clock_btn, clock) = Clock::manual().await;
let telemetry_sender = TelemetrySender::new(Some(tx), clock);
let loop_handler = telemetry_sender.start_loop();
telemetry_sender.send(TelemetryEvent::Create).await;
{
let payload = rx.recv().await.unwrap();
assert_eq!(payload.events.len(), 1);
}
tokio::task::yield_now().await;
telemetry_sender.send(TelemetryEvent::Create).await;
let timeout_res = tokio::time::timeout(Duration::from_millis(1), rx.recv()).await;
assert!(timeout_res.is_err());
telemetry_sender.send(TelemetryEvent::Create).await;
clock_btn.tick().await;
{
let payload = rx.recv().await.unwrap();
assert_eq!(payload.events.len(), 2);
}
loop_handler.terminate_telemetry().await;
}
#[tokio::test]
async fn test_terminate_telemetry_sends_pending_events() {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (_clock_btn, clock) = Clock::manual().await;
let telemetry_sender = TelemetrySender::new(Some(tx), clock);
let loop_handler = telemetry_sender.start_loop();
telemetry_sender.send(TelemetryEvent::Create).await;
let payload = rx.recv().await.unwrap();
assert_eq!(payload.events.len(), 1);
telemetry_sender
.send(TelemetryEvent::EndCommand { return_code: 2i32 })
.await;
loop_handler.terminate_telemetry().await;
let payload = rx.recv().await.unwrap();
assert_eq!(payload.events.len(), 1);
assert!(matches!(
&payload.events[0].event,
&TelemetryEvent::EndCommand { .. }
));
}
}