1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
use std::fmt::Debug;
use std::sync::Arc;
use tokio::task::JoinHandle;
use debug_print::debug_println;
use tokio::sync::Mutex;
use crate::{ChannelReceiver, ChannelSender, WebhookMessage};
/// Maximum number of retries for failed requests
const MAX_RETRIES: usize = 10;
/// This worker manages a background async task that schedules the network requests to send traces
/// to the Discord on the running tokio runtime.
///
/// Ensure to invoke `.startup()` before, and `.teardown()` after, your application code runs. This
/// is required to ensure proper initialization and shutdown.
///
/// `tracing-layer-discord` synchronously generates payloads to send to the Discord API using the
/// tracing events from the global subscriber. However, all network requests are offloaded onto
/// an unbuffered channel and processed by a provided future acting as an asynchronous worker.
pub struct BackgroundWorker {
pub(crate) sender: ChannelSender,
pub(crate) handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl BackgroundWorker {
/// Initiate the worker's shutdown sequence.
///
/// Without invoking`.teardown()`, your application may exit before all Discord messages can be
/// sent.
pub async fn shutdown(self) {
match self.sender.send(WorkerMessage::Shutdown) {
Ok(..) => {
debug_println!("webhook message worker shutdown");
}
Err(e) => {
println!("ERROR: failed to send shutdown message to webhook message worker: {}", e);
}
}
let mut guard = self.handle.lock().await;
if let Some(handle) = guard.take() {
let _ = handle.await;
} else {
println!("ERROR: async task handle to webhook message worker has been already dropped");
}
}
}
/// A command sent to a worker containing a new message that should be sent to a webhook endpoint.
#[derive(Debug)]
pub enum WorkerMessage {
Data(Box<dyn WebhookMessage>),
Shutdown,
}
/// Provides a background worker task that sends the messages generated by the
/// layer.
pub(crate) async fn worker(mut rx: ChannelReceiver) {
let client = reqwest::Client::new();
while let Some(message) = rx.recv().await {
match message {
WorkerMessage::Data(payload) => {
let webhook_url = payload.webhook_url();
let payload_json = payload.serialize();
println!("sending discord message: {}", &payload_json);
let mut retries = 0;
while retries < MAX_RETRIES {
match client
.post(webhook_url)
.header("Content-Type", "application/json")
.body(payload_json.clone())
.send()
.await
{
Ok(res) => {
debug_println!("webhook message sent: {:?}", &res);
let res_text = res.text().await.unwrap();
debug_println!("webhook message response: {}", res_text);
break; // Success, break out of the retry loop
}
Err(e) => {
println!("ERROR: failed to send webhook message: {}", e);
}
};
// Exponential backoff - increase the delay between retries
let delay_ms = 2u64.pow(retries as u32) * 100;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
retries += 1;
}
}
WorkerMessage::Shutdown => {
break;
}
}
}
}