Skip to main content

quickwit_telemetry/
sender.rs

1// Copyright (C) 2021 Quickwit, Inc.
2//
3// Quickwit is offered under the AGPL v3.0 and as commercial software.
4// For commercial licensing, contact us at hello@quickwit.io.
5//
6// AGPL:
7// This program is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Affero General Public License as
9// published by the Free Software Foundation, either version 3 of the
10// License, or (at your option) any later version.
11//
12// This program is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Affero General Public License for more details.
16//
17// You should have received a copy of the GNU Affero General Public License
18// along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20use std::mem;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24
25use tokio::sync::mpsc::{Receiver, Sender};
26use tokio::sync::{oneshot, Mutex, RwLock};
27use tokio::task::JoinHandle;
28use tokio::time::Interval;
29use tracing::info;
30
31use crate::payload::{ClientInformation, EventWithTimestamp, TelemetryEvent, TelemetryPayload};
32use crate::sink::{HttpClient, Sink};
33
34/// At most 1 Request per minutes.
35const TELEMETRY_PUSH_COOLDOWN: Duration = Duration::from_secs(60);
36
37/// Upon termination of the program, we send one last telemetry request with pending events.
38/// This duration is the amount of time we wait for at most to send that last telemetry request.
39const LAST_REQUEST_TIMEOUT: Duration = Duration::from_secs(1);
40
41const MAX_NUM_EVENTS_IN_QUEUE: usize = 10;
42
43#[cfg(test)]
44struct ClockButton(Sender<()>);
45
46#[cfg(test)]
47impl ClockButton {
48    async fn tick(&self) {
49        let _ = self.0.send(()).await;
50    }
51}
52
53enum Clock {
54    Periodical(Mutex<Interval>),
55    #[cfg(test)]
56    Manual(Mutex<Receiver<()>>),
57}
58
59impl Clock {
60    pub fn periodical(period: Duration) -> Clock {
61        let interval = tokio::time::interval(period);
62        Clock::Periodical(Mutex::new(interval))
63    }
64
65    #[cfg(test)]
66    pub async fn manual() -> (ClockButton, Clock) {
67        let (tx, rx) = tokio::sync::mpsc::channel(1);
68        let _ = tx.send(()).await;
69        let button = ClockButton(tx);
70        (button, Clock::Manual(Mutex::new(rx)))
71    }
72
73    async fn tick(&self) {
74        match self {
75            Clock::Periodical(interval) => {
76                interval.lock().await.tick().await;
77            }
78            #[cfg(test)]
79            Clock::Manual(channel) => {
80                channel.lock().await.recv().await;
81            }
82        }
83    }
84}
85
86#[derive(Default)]
87struct EventsState {
88    events: Vec<EventWithTimestamp>,
89    num_dropped_events: usize,
90}
91
92impl EventsState {
93    fn drain_events(&mut self) -> EventsState {
94        mem::replace(
95            self,
96            EventsState {
97                events: Vec::new(),
98                num_dropped_events: 0,
99            },
100        )
101    }
102
103    /// Adds an event.
104    /// If the queue is already saturated, (ie. it has reached the len `MAX_NUM_EVENTS_IN_QUEUE`)
105    // Returns true iff it was the first event in the queue.
106    fn push_event(&mut self, event: TelemetryEvent) -> bool {
107        if self.events.len() >= MAX_NUM_EVENTS_IN_QUEUE {
108            self.num_dropped_events += 1;
109            return false;
110        }
111        let events_was_empty = self.events.is_empty();
112        self.events.push(EventWithTimestamp::from(event));
113        events_was_empty
114    }
115}
116
117struct Events {
118    state: RwLock<EventsState>,
119    items_available_tx: Sender<()>,
120    items_available_rx: RwLock<Receiver<()>>,
121}
122
123impl Default for Events {
124    fn default() -> Self {
125        let (items_available_tx, items_available_rx) = tokio::sync::mpsc::channel(1);
126        Events {
127            state: RwLock::new(EventsState::default()),
128            items_available_tx,
129            items_available_rx: RwLock::new(items_available_rx),
130        }
131    }
132}
133
134impl Events {
135    /// Wait for events to be available (if there are pending events, then do not wait)
136    /// and then send them to the ingest API server.
137    async fn drain_events(&self) -> EventsState {
138        self.items_available_rx.write().await.recv().await;
139        self.state.write().await.drain_events()
140    }
141
142    async fn push_event(&self, event: TelemetryEvent) {
143        let is_first_event = self.state.write().await.push_event(event);
144        if is_first_event {
145            let _ = self.items_available_tx.send(()).await;
146        }
147    }
148}
149
150pub(crate) struct Inner {
151    sink: Option<Box<dyn Sink>>,
152    client_information: ClientInformation,
153    /// This channel is just used to signal there are new items available.
154    events: Events,
155    clock: Clock,
156    is_started: AtomicBool,
157}
158
159impl Inner {
160    pub fn is_disabled(&self) -> bool {
161        self.sink.is_none()
162    }
163
164    async fn create_telemetry_payload(&self) -> TelemetryPayload {
165        let events_state = self.events.drain_events().await;
166        TelemetryPayload {
167            client_information: self.client_information.clone(),
168            events: events_state.events,
169            num_dropped_events: events_state.num_dropped_events,
170        }
171    }
172
173    /// Wait for events to be available (if there are pending events, then do not wait)
174    /// and then send them to the ingest API server.
175    ///
176    /// If the requests fails, it fails silently.
177    async fn send_pending_events(&self) {
178        if let Some(sink) = self.sink.as_ref() {
179            let payload = self.create_telemetry_payload().await;
180            sink.send_payload(payload).await;
181        }
182    }
183
184    async fn send(&self, event: TelemetryEvent) {
185        if self.is_disabled() {
186            return;
187        }
188        self.events.push_event(event).await;
189    }
190}
191
192pub struct TelemetrySender {
193    pub(crate) inner: Arc<Inner>,
194}
195
196pub enum TelemetryLoopHandle {
197    NoLoop,
198    WithLoop {
199        join_handle: JoinHandle<()>,
200        terminate_command_tx: oneshot::Sender<()>,
201    },
202}
203
204impl TelemetryLoopHandle {
205    /// Terminate telemetry will exit the telemetry loop
206    /// and possibly send the last request, possibly ignoring the
207    /// telemetry cooldown.
208    pub async fn terminate_telemetry(self) {
209        if let Self::WithLoop {
210            join_handle,
211            terminate_command_tx,
212        } = self
213        {
214            let _ = terminate_command_tx.send(());
215            let _ = tokio::time::timeout(LAST_REQUEST_TIMEOUT, join_handle).await;
216        }
217    }
218}
219
220impl TelemetrySender {
221    fn new<S: Sink>(sink_opt: Option<S>, clock: Clock) -> TelemetrySender {
222        let sink_opt: Option<Box<dyn Sink>> = if let Some(sink) = sink_opt {
223            Some(Box::new(sink))
224        } else {
225            None
226        };
227        TelemetrySender {
228            inner: Arc::new(Inner {
229                sink: sink_opt,
230                client_information: ClientInformation::default(),
231                events: Events::default(),
232                clock,
233                is_started: AtomicBool::new(false),
234            }),
235        }
236    }
237
238    pub fn start_loop(&self) -> TelemetryLoopHandle {
239        let (terminate_command_tx, mut terminate_command_rx) = oneshot::channel();
240        if self.inner.is_disabled() {
241            return TelemetryLoopHandle::NoLoop;
242        }
243
244        assert!(
245            self.inner
246                .is_started
247                .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
248                .is_ok(),
249            "The telemetry loop is already started."
250        );
251
252        let inner = self.inner.clone();
253        let join_handle = tokio::task::spawn(async move {
254            // This channel is used to send the command to terminate telemetry.
255            loop {
256                let quit_loop = tokio::select! {
257                    _ = (&mut terminate_command_rx) => { true }
258                    _ = inner.clock.tick() => { false }
259                };
260                let _ = inner.send_pending_events().await;
261                if quit_loop {
262                    break;
263                }
264            }
265        });
266        TelemetryLoopHandle::WithLoop {
267            join_handle,
268            terminate_command_tx,
269        }
270    }
271
272    pub async fn send(&self, event: TelemetryEvent) {
273        self.inner.send(event).await;
274    }
275}
276
277/// Check to see if telemetry is enabled.
278pub fn is_telemetry_enabled() -> bool {
279    std::env::var_os(crate::DISABLE_TELEMETRY_ENV_KEY).is_none()
280}
281
282fn create_http_client() -> Option<HttpClient> {
283    // TODO add telemetry URL.
284    let client_opt = if is_telemetry_enabled() {
285        HttpClient::try_new()
286    } else {
287        None
288    };
289    if let Some(client) = client_opt.as_ref() {
290        info!("telemetry to {} is enabled.", client.endpoint());
291    } else {
292        info!("telemetry to quickwit is disabled.");
293    }
294    client_opt
295}
296
297impl Default for TelemetrySender {
298    fn default() -> Self {
299        let http_client = create_http_client();
300        TelemetrySender::new(http_client, Clock::periodical(TELEMETRY_PUSH_COOLDOWN))
301    }
302}
303
304#[cfg(test)]
305mod tests {
306
307    use std::env;
308
309    use super::*;
310
311    #[ignore]
312    #[tokio::test]
313    async fn test_enabling_and_disabling_telemetry() {
314        // We group the two in a single test to ensure it happens on the same thread.
315        env::set_var(crate::DISABLE_TELEMETRY_ENV_KEY, "");
316        assert_eq!(TelemetrySender::default().inner.is_disabled(), true);
317        env::remove_var(crate::DISABLE_TELEMETRY_ENV_KEY);
318        assert_eq!(TelemetrySender::default().inner.is_disabled(), false);
319    }
320
321    #[tokio::test]
322    async fn test_telemetry_no_wait_for_first_event() {
323        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
324        let (_clock_btn, clock) = Clock::manual().await;
325        let telemetry_sender = TelemetrySender::new(Some(tx), clock);
326        let loop_handler = telemetry_sender.start_loop();
327        telemetry_sender.send(TelemetryEvent::Create).await;
328        let payload_opt = rx.recv().await;
329        assert!(payload_opt.is_some());
330        let payload = payload_opt.unwrap();
331        assert_eq!(payload.events.len(), 1);
332        loop_handler.terminate_telemetry().await;
333    }
334
335    #[tokio::test]
336    async fn test_telemetry_two_events() {
337        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
338        let (clock_btn, clock) = Clock::manual().await;
339        let telemetry_sender = TelemetrySender::new(Some(tx), clock);
340        let loop_handler = telemetry_sender.start_loop();
341        telemetry_sender.send(TelemetryEvent::Create).await;
342        {
343            let payload = rx.recv().await.unwrap();
344            assert_eq!(payload.events.len(), 1);
345        }
346        clock_btn.tick().await;
347        telemetry_sender.send(TelemetryEvent::Create).await;
348        {
349            let payload = rx.recv().await.unwrap();
350            assert_eq!(payload.events.len(), 1);
351        }
352        loop_handler.terminate_telemetry().await;
353    }
354
355    #[tokio::test]
356    async fn test_telemetry_cooldown_observed() {
357        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
358        let (clock_btn, clock) = Clock::manual().await;
359        let telemetry_sender = TelemetrySender::new(Some(tx), clock);
360        let loop_handler = telemetry_sender.start_loop();
361        telemetry_sender.send(TelemetryEvent::Create).await;
362        {
363            let payload = rx.recv().await.unwrap();
364            assert_eq!(payload.events.len(), 1);
365        }
366        tokio::task::yield_now().await;
367        telemetry_sender.send(TelemetryEvent::Create).await;
368
369        let timeout_res = tokio::time::timeout(Duration::from_millis(1), rx.recv()).await;
370        assert!(timeout_res.is_err());
371
372        telemetry_sender.send(TelemetryEvent::Create).await;
373        clock_btn.tick().await;
374        {
375            let payload = rx.recv().await.unwrap();
376            assert_eq!(payload.events.len(), 2);
377        }
378        loop_handler.terminate_telemetry().await;
379    }
380
381    #[tokio::test]
382    async fn test_terminate_telemetry_sends_pending_events() {
383        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
384        let (_clock_btn, clock) = Clock::manual().await;
385        let telemetry_sender = TelemetrySender::new(Some(tx), clock);
386        let loop_handler = telemetry_sender.start_loop();
387        telemetry_sender.send(TelemetryEvent::Create).await;
388        let payload = rx.recv().await.unwrap();
389        assert_eq!(payload.events.len(), 1);
390        telemetry_sender
391            .send(TelemetryEvent::EndCommand { return_code: 2i32 })
392            .await;
393        loop_handler.terminate_telemetry().await;
394        let payload = rx.recv().await.unwrap();
395        assert_eq!(payload.events.len(), 1);
396        assert!(matches!(
397            &payload.events[0].event,
398            &TelemetryEvent::EndCommand { .. }
399        ));
400    }
401}