tracing_layer_core/
worker.rs

1use std::fmt::Debug;
2use std::sync::Arc;
3
4use tokio::task::JoinHandle;
5use debug_print::debug_println;
6use tokio::sync::Mutex;
7
8use crate::{ChannelReceiver, ChannelSender, WebhookMessage};
9
10/// Maximum number of retries for failed requests
11const MAX_RETRIES: usize = 10;
12
13/// This worker manages a background async task that schedules the network requests to send traces
14/// to the webhook on the running tokio runtime.
15///
16/// Ensure to invoke `.start()` before, and `.teardown()` after, your application code runs. This
17/// is required to ensure proper initialization and shutdown.
18///
19/// `tracing-layer-core` synchronously generates payloads to send to the webhook using the
20/// tracing events from the global subscriber. However, all network requests are offloaded onto
21/// an unbuffered channel and processed by a provided future acting as an asynchronous worker.
22#[derive(Clone)]
23pub struct BackgroundWorker {
24    /// The sender used to send messages to the worker task.
25    ///
26    /// This sender is used to send `WorkerMessage` instances to the worker for processing.
27    pub(crate) sender: ChannelSender,
28
29    /// A handle to the spawned worker task.
30    ///
31    /// This handle is used to await the completion of the worker task when shutting down.
32    /// The handle is stored in a `tokio::sync::Mutex` to ensure safe access across asynchronous contexts.
33    pub(crate) handle: Arc<Mutex<Option<JoinHandle<()>>>>,
34
35    /// The receiver for messages to be processed by the worker task.
36    ///
37    /// This receiver is wrapped in an `Arc<Mutex<>>` to allow shared mutable access
38    /// between the `start` function and the worker task.
39    pub(crate) rx: Arc<Mutex<ChannelReceiver>>,
40}
41
42impl BackgroundWorker {
43    /// Starts the background worker.
44    ///
45    /// This function should only be called once. Attempting to call `start` more than once
46    /// will lead to a deadlock, as the function internally locks the receiver mutex and
47    /// spawns a task to process messages.
48    pub async fn start(&self) {
49        let rx = self.rx.clone();
50        let future = async move {
51            let mut rx = rx.lock().await;
52            worker(&mut *rx).await;
53        };
54        let handle = tokio::spawn(future);
55        let mut guard = self.handle.lock().await;
56        *guard = Some(handle);
57    }
58
59    /// Initiates the shutdown of the background worker.
60    ///
61    /// Sends a shutdown message to the worker and waits for the worker task to complete.
62    /// If the worker task handle has already been dropped, an error message will be printed.
63    pub async fn shutdown(self) {
64        match self.sender.send(WorkerMessage::Shutdown) {
65            Ok(..) => {
66                debug_println!("webhook message worker shutdown");
67            }
68            Err(e) => {
69                println!("ERROR: failed to send shutdown message to webhook message worker: {}", e);
70            }
71        }
72        let mut guard = self.handle.lock().await;
73        if let Some(handle) = guard.take() {
74            let _ = handle.await;
75        } else {
76            println!("ERROR: async task handle to webhook message worker has been already dropped");
77        }
78    }
79
80}
81
82/// A command sent to a worker containing a new message that should be sent to a webhook endpoint.
83#[derive(Debug)]
84pub enum WorkerMessage {
85    Data(Box<dyn WebhookMessage>),
86    Shutdown,
87}
88
89/// Provides a background worker task that sends the messages generated by the layer.
90pub(crate) async fn worker(rx: &mut ChannelReceiver) {
91    let client = reqwest::Client::new();
92    while let Some(message) = rx.recv().await {
93        match message {
94            WorkerMessage::Data(payload) => {
95                let webhook_url = payload.webhook_url();
96                let payload_json = payload.serialize();
97                println!("sending webhook message: {}", &payload_json);
98
99                let mut retries = 0;
100                while retries < MAX_RETRIES {
101                    match client
102                        .post(webhook_url)
103                        .header("Content-Type", "application/json")
104                        .body(payload_json.clone())
105                        .send()
106                        .await
107                    {
108                        Ok(res) => {
109                            debug_println!("webhook message sent: {:?}", &res);
110                            let res_text = res.text().await.unwrap();
111                            debug_println!("webhook message response: {}", res_text);
112                            break; // Success, break out of the retry loop
113                        }
114                        Err(e) => {
115                            println!("ERROR: failed to send webhook message: {}", e);
116                        }
117                    };
118
119                    // Exponential backoff - increase the delay between retries
120                    let delay_ms = 2u64.pow(retries as u32) * 100;
121                    tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
122                    retries += 1;
123                }
124            }
125            WorkerMessage::Shutdown => {
126                break;
127            }
128        }
129    }
130}