1use std::mem;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::sync::{oneshot, Mutex, RwLock};
27use tokio::task::JoinHandle;
28use tokio::time::Interval;
29use tracing::info;
30
31use crate::payload::{ClientInformation, EventWithTimestamp, TelemetryEvent, TelemetryPayload};
32use crate::sink::{HttpClient, Sink};
33
34const TELEMETRY_PUSH_COOLDOWN: Duration = Duration::from_secs(60);
36
37const LAST_REQUEST_TIMEOUT: Duration = Duration::from_secs(1);
40
41const MAX_NUM_EVENTS_IN_QUEUE: usize = 10;
42
43#[cfg(test)]
44struct ClockButton(Sender<()>);
45
46#[cfg(test)]
47impl ClockButton {
48 async fn tick(&self) {
49 let _ = self.0.send(()).await;
50 }
51}
52
53enum Clock {
54 Periodical(Mutex<Interval>),
55 #[cfg(test)]
56 Manual(Mutex<Receiver<()>>),
57}
58
59impl Clock {
60 pub fn periodical(period: Duration) -> Clock {
61 let interval = tokio::time::interval(period);
62 Clock::Periodical(Mutex::new(interval))
63 }
64
65 #[cfg(test)]
66 pub async fn manual() -> (ClockButton, Clock) {
67 let (tx, rx) = tokio::sync::mpsc::channel(1);
68 let _ = tx.send(()).await;
69 let button = ClockButton(tx);
70 (button, Clock::Manual(Mutex::new(rx)))
71 }
72
73 async fn tick(&self) {
74 match self {
75 Clock::Periodical(interval) => {
76 interval.lock().await.tick().await;
77 }
78 #[cfg(test)]
79 Clock::Manual(channel) => {
80 channel.lock().await.recv().await;
81 }
82 }
83 }
84}
85
86#[derive(Default)]
87struct EventsState {
88 events: Vec<EventWithTimestamp>,
89 num_dropped_events: usize,
90}
91
92impl EventsState {
93 fn drain_events(&mut self) -> EventsState {
94 mem::replace(
95 self,
96 EventsState {
97 events: Vec::new(),
98 num_dropped_events: 0,
99 },
100 )
101 }
102
103 fn push_event(&mut self, event: TelemetryEvent) -> bool {
107 if self.events.len() >= MAX_NUM_EVENTS_IN_QUEUE {
108 self.num_dropped_events += 1;
109 return false;
110 }
111 let events_was_empty = self.events.is_empty();
112 self.events.push(EventWithTimestamp::from(event));
113 events_was_empty
114 }
115}
116
117struct Events {
118 state: RwLock<EventsState>,
119 items_available_tx: Sender<()>,
120 items_available_rx: RwLock<Receiver<()>>,
121}
122
123impl Default for Events {
124 fn default() -> Self {
125 let (items_available_tx, items_available_rx) = tokio::sync::mpsc::channel(1);
126 Events {
127 state: RwLock::new(EventsState::default()),
128 items_available_tx,
129 items_available_rx: RwLock::new(items_available_rx),
130 }
131 }
132}
133
134impl Events {
135 async fn drain_events(&self) -> EventsState {
138 self.items_available_rx.write().await.recv().await;
139 self.state.write().await.drain_events()
140 }
141
142 async fn push_event(&self, event: TelemetryEvent) {
143 let is_first_event = self.state.write().await.push_event(event);
144 if is_first_event {
145 let _ = self.items_available_tx.send(()).await;
146 }
147 }
148}
149
150pub(crate) struct Inner {
151 sink: Option<Box<dyn Sink>>,
152 client_information: ClientInformation,
153 events: Events,
155 clock: Clock,
156 is_started: AtomicBool,
157}
158
159impl Inner {
160 pub fn is_disabled(&self) -> bool {
161 self.sink.is_none()
162 }
163
164 async fn create_telemetry_payload(&self) -> TelemetryPayload {
165 let events_state = self.events.drain_events().await;
166 TelemetryPayload {
167 client_information: self.client_information.clone(),
168 events: events_state.events,
169 num_dropped_events: events_state.num_dropped_events,
170 }
171 }
172
173 async fn send_pending_events(&self) {
178 if let Some(sink) = self.sink.as_ref() {
179 let payload = self.create_telemetry_payload().await;
180 sink.send_payload(payload).await;
181 }
182 }
183
184 async fn send(&self, event: TelemetryEvent) {
185 if self.is_disabled() {
186 return;
187 }
188 self.events.push_event(event).await;
189 }
190}
191
192pub struct TelemetrySender {
193 pub(crate) inner: Arc<Inner>,
194}
195
196pub enum TelemetryLoopHandle {
197 NoLoop,
198 WithLoop {
199 join_handle: JoinHandle<()>,
200 terminate_command_tx: oneshot::Sender<()>,
201 },
202}
203
204impl TelemetryLoopHandle {
205 pub async fn terminate_telemetry(self) {
209 if let Self::WithLoop {
210 join_handle,
211 terminate_command_tx,
212 } = self
213 {
214 let _ = terminate_command_tx.send(());
215 let _ = tokio::time::timeout(LAST_REQUEST_TIMEOUT, join_handle).await;
216 }
217 }
218}
219
220impl TelemetrySender {
221 fn new<S: Sink>(sink_opt: Option<S>, clock: Clock) -> TelemetrySender {
222 let sink_opt: Option<Box<dyn Sink>> = if let Some(sink) = sink_opt {
223 Some(Box::new(sink))
224 } else {
225 None
226 };
227 TelemetrySender {
228 inner: Arc::new(Inner {
229 sink: sink_opt,
230 client_information: ClientInformation::default(),
231 events: Events::default(),
232 clock,
233 is_started: AtomicBool::new(false),
234 }),
235 }
236 }
237
238 pub fn start_loop(&self) -> TelemetryLoopHandle {
239 let (terminate_command_tx, mut terminate_command_rx) = oneshot::channel();
240 if self.inner.is_disabled() {
241 return TelemetryLoopHandle::NoLoop;
242 }
243
244 assert!(
245 self.inner
246 .is_started
247 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
248 .is_ok(),
249 "The telemetry loop is already started."
250 );
251
252 let inner = self.inner.clone();
253 let join_handle = tokio::task::spawn(async move {
254 loop {
256 let quit_loop = tokio::select! {
257 _ = (&mut terminate_command_rx) => { true }
258 _ = inner.clock.tick() => { false }
259 };
260 let _ = inner.send_pending_events().await;
261 if quit_loop {
262 break;
263 }
264 }
265 });
266 TelemetryLoopHandle::WithLoop {
267 join_handle,
268 terminate_command_tx,
269 }
270 }
271
272 pub async fn send(&self, event: TelemetryEvent) {
273 self.inner.send(event).await;
274 }
275}
276
277pub fn is_telemetry_enabled() -> bool {
279 std::env::var_os(crate::DISABLE_TELEMETRY_ENV_KEY).is_none()
280}
281
282fn create_http_client() -> Option<HttpClient> {
283 let client_opt = if is_telemetry_enabled() {
285 HttpClient::try_new()
286 } else {
287 None
288 };
289 if let Some(client) = client_opt.as_ref() {
290 info!("telemetry to {} is enabled.", client.endpoint());
291 } else {
292 info!("telemetry to quickwit is disabled.");
293 }
294 client_opt
295}
296
297impl Default for TelemetrySender {
298 fn default() -> Self {
299 let http_client = create_http_client();
300 TelemetrySender::new(http_client, Clock::periodical(TELEMETRY_PUSH_COOLDOWN))
301 }
302}
303
304#[cfg(test)]
305mod tests {
306
307 use std::env;
308
309 use super::*;
310
311 #[ignore]
312 #[tokio::test]
313 async fn test_enabling_and_disabling_telemetry() {
314 env::set_var(crate::DISABLE_TELEMETRY_ENV_KEY, "");
316 assert_eq!(TelemetrySender::default().inner.is_disabled(), true);
317 env::remove_var(crate::DISABLE_TELEMETRY_ENV_KEY);
318 assert_eq!(TelemetrySender::default().inner.is_disabled(), false);
319 }
320
321 #[tokio::test]
322 async fn test_telemetry_no_wait_for_first_event() {
323 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
324 let (_clock_btn, clock) = Clock::manual().await;
325 let telemetry_sender = TelemetrySender::new(Some(tx), clock);
326 let loop_handler = telemetry_sender.start_loop();
327 telemetry_sender.send(TelemetryEvent::Create).await;
328 let payload_opt = rx.recv().await;
329 assert!(payload_opt.is_some());
330 let payload = payload_opt.unwrap();
331 assert_eq!(payload.events.len(), 1);
332 loop_handler.terminate_telemetry().await;
333 }
334
335 #[tokio::test]
336 async fn test_telemetry_two_events() {
337 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
338 let (clock_btn, clock) = Clock::manual().await;
339 let telemetry_sender = TelemetrySender::new(Some(tx), clock);
340 let loop_handler = telemetry_sender.start_loop();
341 telemetry_sender.send(TelemetryEvent::Create).await;
342 {
343 let payload = rx.recv().await.unwrap();
344 assert_eq!(payload.events.len(), 1);
345 }
346 clock_btn.tick().await;
347 telemetry_sender.send(TelemetryEvent::Create).await;
348 {
349 let payload = rx.recv().await.unwrap();
350 assert_eq!(payload.events.len(), 1);
351 }
352 loop_handler.terminate_telemetry().await;
353 }
354
355 #[tokio::test]
356 async fn test_telemetry_cooldown_observed() {
357 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
358 let (clock_btn, clock) = Clock::manual().await;
359 let telemetry_sender = TelemetrySender::new(Some(tx), clock);
360 let loop_handler = telemetry_sender.start_loop();
361 telemetry_sender.send(TelemetryEvent::Create).await;
362 {
363 let payload = rx.recv().await.unwrap();
364 assert_eq!(payload.events.len(), 1);
365 }
366 tokio::task::yield_now().await;
367 telemetry_sender.send(TelemetryEvent::Create).await;
368
369 let timeout_res = tokio::time::timeout(Duration::from_millis(1), rx.recv()).await;
370 assert!(timeout_res.is_err());
371
372 telemetry_sender.send(TelemetryEvent::Create).await;
373 clock_btn.tick().await;
374 {
375 let payload = rx.recv().await.unwrap();
376 assert_eq!(payload.events.len(), 2);
377 }
378 loop_handler.terminate_telemetry().await;
379 }
380
381 #[tokio::test]
382 async fn test_terminate_telemetry_sends_pending_events() {
383 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
384 let (_clock_btn, clock) = Clock::manual().await;
385 let telemetry_sender = TelemetrySender::new(Some(tx), clock);
386 let loop_handler = telemetry_sender.start_loop();
387 telemetry_sender.send(TelemetryEvent::Create).await;
388 let payload = rx.recv().await.unwrap();
389 assert_eq!(payload.events.len(), 1);
390 telemetry_sender
391 .send(TelemetryEvent::EndCommand { return_code: 2i32 })
392 .await;
393 loop_handler.terminate_telemetry().await;
394 let payload = rx.recv().await.unwrap();
395 assert_eq!(payload.events.len(), 1);
396 assert!(matches!(
397 &payload.events[0].event,
398 &TelemetryEvent::EndCommand { .. }
399 ));
400 }
401}