tracing_layer_core/worker.rs
1use std::fmt::Debug;
2use std::sync::Arc;
3
4use tokio::task::JoinHandle;
5use debug_print::debug_println;
6use tokio::sync::Mutex;
7
8use crate::{ChannelReceiver, ChannelSender, WebhookMessage};
9
10/// Maximum number of retries for failed requests
11const MAX_RETRIES: usize = 10;
12
13/// This worker manages a background async task that schedules the network requests to send traces
14/// to the webhook on the running tokio runtime.
15///
16/// Ensure to invoke `.start()` before, and `.teardown()` after, your application code runs. This
17/// is required to ensure proper initialization and shutdown.
18///
19/// `tracing-layer-core` synchronously generates payloads to send to the webhook using the
20/// tracing events from the global subscriber. However, all network requests are offloaded onto
21/// an unbuffered channel and processed by a provided future acting as an asynchronous worker.
22#[derive(Clone)]
23pub struct BackgroundWorker {
24 /// The sender used to send messages to the worker task.
25 ///
26 /// This sender is used to send `WorkerMessage` instances to the worker for processing.
27 pub(crate) sender: ChannelSender,
28
29 /// A handle to the spawned worker task.
30 ///
31 /// This handle is used to await the completion of the worker task when shutting down.
32 /// The handle is stored in a `tokio::sync::Mutex` to ensure safe access across asynchronous contexts.
33 pub(crate) handle: Arc<Mutex<Option<JoinHandle<()>>>>,
34
35 /// The receiver for messages to be processed by the worker task.
36 ///
37 /// This receiver is wrapped in an `Arc<Mutex<>>` to allow shared mutable access
38 /// between the `start` function and the worker task.
39 pub(crate) rx: Arc<Mutex<ChannelReceiver>>,
40}
41
42impl BackgroundWorker {
43 /// Starts the background worker.
44 ///
45 /// This function should only be called once. Attempting to call `start` more than once
46 /// will lead to a deadlock, as the function internally locks the receiver mutex and
47 /// spawns a task to process messages.
48 pub async fn start(&self) {
49 let rx = self.rx.clone();
50 let future = async move {
51 let mut rx = rx.lock().await;
52 worker(&mut *rx).await;
53 };
54 let handle = tokio::spawn(future);
55 let mut guard = self.handle.lock().await;
56 *guard = Some(handle);
57 }
58
59 /// Initiates the shutdown of the background worker.
60 ///
61 /// Sends a shutdown message to the worker and waits for the worker task to complete.
62 /// If the worker task handle has already been dropped, an error message will be printed.
63 pub async fn shutdown(self) {
64 match self.sender.send(WorkerMessage::Shutdown) {
65 Ok(..) => {
66 debug_println!("webhook message worker shutdown");
67 }
68 Err(e) => {
69 println!("ERROR: failed to send shutdown message to webhook message worker: {}", e);
70 }
71 }
72 let mut guard = self.handle.lock().await;
73 if let Some(handle) = guard.take() {
74 let _ = handle.await;
75 } else {
76 println!("ERROR: async task handle to webhook message worker has been already dropped");
77 }
78 }
79
80}
81
82/// A command sent to a worker containing a new message that should be sent to a webhook endpoint.
83#[derive(Debug)]
84pub enum WorkerMessage {
85 Data(Box<dyn WebhookMessage>),
86 Shutdown,
87}
88
89/// Provides a background worker task that sends the messages generated by the layer.
90pub(crate) async fn worker(rx: &mut ChannelReceiver) {
91 let client = reqwest::Client::new();
92 while let Some(message) = rx.recv().await {
93 match message {
94 WorkerMessage::Data(payload) => {
95 let webhook_url = payload.webhook_url();
96 let payload_json = payload.serialize();
97 println!("sending webhook message: {}", &payload_json);
98
99 let mut retries = 0;
100 while retries < MAX_RETRIES {
101 match client
102 .post(webhook_url)
103 .header("Content-Type", "application/json")
104 .body(payload_json.clone())
105 .send()
106 .await
107 {
108 Ok(res) => {
109 debug_println!("webhook message sent: {:?}", &res);
110 let res_text = res.text().await.unwrap();
111 debug_println!("webhook message response: {}", res_text);
112 break; // Success, break out of the retry loop
113 }
114 Err(e) => {
115 println!("ERROR: failed to send webhook message: {}", e);
116 }
117 };
118
119 // Exponential backoff - increase the delay between retries
120 let delay_ms = 2u64.pow(retries as u32) * 100;
121 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
122 retries += 1;
123 }
124 }
125 WorkerMessage::Shutdown => {
126 break;
127 }
128 }
129 }
130}