use std::fmt::Debug;
use std::sync::Arc;
use debug_print::debug_println;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::{ChannelReceiver, ChannelSender, WebhookMessage};
const MAX_RETRIES: usize = 10;
#[derive(Clone)]
pub struct BackgroundWorker {
pub(crate) sender: ChannelSender,
pub(crate) handle: Arc<Mutex<Option<JoinHandle<()>>>>,
pub(crate) rx: Arc<Mutex<ChannelReceiver>>,
}
impl BackgroundWorker {
pub async fn start(&self) {
let rx = self.rx.clone();
let future = async move {
let mut rx = rx.lock().await;
worker(&mut rx).await;
};
let handle = tokio::spawn(future);
let mut guard = self.handle.lock().await;
*guard = Some(handle);
}
pub async fn shutdown(self) {
match self.sender.send(WorkerMessage::Shutdown) {
Ok(..) => {
debug_println!("webhook message worker shutdown");
}
Err(e) => {
#[cfg(feature = "log-errors")]
eprintln!(
"ERROR: failed to send shutdown message to webhook message worker: {}",
e
);
}
}
let mut guard = self.handle.lock().await;
if let Some(handle) = guard.take() {
let _ = handle.await;
} else {
#[cfg(feature = "log-errors")]
eprintln!("ERROR: async task handle to webhook message worker has been already dropped");
}
}
}
#[derive(Debug)]
pub enum WorkerMessage {
Data(Box<dyn WebhookMessage>),
Shutdown,
}
pub(crate) async fn worker(rx: &mut ChannelReceiver) {
let client = reqwest::Client::new();
while let Some(message) = rx.recv().await {
match message {
WorkerMessage::Data(payload) => {
let webhook_url = payload.webhook_url();
let payload_json = payload.serialize();
debug_println!("sending webhook message: {}", &payload_json);
let mut retries = 0;
while retries < MAX_RETRIES {
match client
.post(webhook_url)
.header("Content-Type", "application/json")
.body(payload_json.clone())
.send()
.await
{
Ok(res) => {
debug_println!("webhook message sent: {:?}", &res);
let res_text = res.text().await.unwrap();
debug_println!("webhook message response: {}", res_text);
break; }
Err(e) => {
#[cfg(feature = "log-errors")]
eprintln!("ERROR: failed to send webhook message: {}", e);
}
};
let delay_ms = 2u64.pow(retries as u32) * 100;
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
retries += 1;
}
}
WorkerMessage::Shutdown => {
break;
}
}
}
}