use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use reqwest::Client;
use serde::Serialize;
use tokio::sync::{RwLock, mpsc};
use crate::events::{DownloadEvent, EventFilter, RetryStrategy};
use crate::utils::retry::RetryPolicy;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Default)]
pub enum WebhookMethod {
#[default]
Post,
Put,
Patch,
}
#[derive(Debug, Clone)]
pub struct WebhookConfig {
url: String,
method: WebhookMethod,
headers: HashMap<String, String>,
filter: EventFilter,
retry_policy: RetryPolicy,
timeout: Duration,
include_full_data: bool,
}
impl WebhookConfig {
pub fn new(url: impl Into<String>) -> Self {
let url_string = url.into();
tracing::debug!(
url = %url_string,
"⚙️ Creating new WebhookConfig"
);
Self {
url: url_string,
method: WebhookMethod::default(),
headers: HashMap::new(),
filter: EventFilter::all(),
retry_policy: RetryPolicy::default(),
timeout: Duration::from_secs(10),
include_full_data: true,
}
}
pub fn from_env() -> Option<Self> {
tracing::debug!("⚙️ Loading WebhookConfig from environment");
let url = std::env::var("YTDLP_WEBHOOK_URL").ok()?;
tracing::debug!(
url = %url,
"⚙️ Found YTDLP_WEBHOOK_URL in environment"
);
let mut config = Self::new(url);
if let Ok(method) = std::env::var("YTDLP_WEBHOOK_METHOD") {
config.method = match method.to_uppercase().as_str() {
"POST" => WebhookMethod::Post,
"PUT" => WebhookMethod::Put,
"PATCH" => WebhookMethod::Patch,
_ => WebhookMethod::Post,
};
}
if let Ok(timeout_str) = std::env::var("YTDLP_WEBHOOK_TIMEOUT")
&& let Ok(timeout_secs) = timeout_str.parse::<u64>()
{
config.timeout = Duration::from_secs(timeout_secs);
}
tracing::debug!(
url = %config.url,
method = ?config.method,
timeout_secs = config.timeout.as_secs(),
"✅ WebhookConfig created from environment"
);
Some(config)
}
pub fn with_method(mut self, method: WebhookMethod) -> Self {
self.method = method;
self
}
pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.insert(key.into(), value.into());
self
}
pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
self.headers.extend(headers);
self
}
pub fn with_filter(mut self, filter: EventFilter) -> Self {
self.filter = filter;
self
}
pub fn with_retry_strategy(mut self, strategy: RetryStrategy) -> Self {
self.retry_policy = RetryPolicy::builder()
.max_attempts(strategy.max_attempts as u32)
.initial_delay(strategy.initial_delay)
.max_delay(strategy.max_delay)
.backoff_factor(strategy.backoff_multiplier)
.build();
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_full_data(mut self, include: bool) -> Self {
self.include_full_data = include;
self
}
pub fn url(&self) -> &str {
&self.url
}
pub fn filter(&self) -> &EventFilter {
&self.filter
}
}
impl std::fmt::Display for WebhookMethod {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Post => f.write_str("Post"),
Self::Put => f.write_str("Put"),
Self::Patch => f.write_str("Patch"),
}
}
}
impl std::fmt::Display for WebhookConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"WebhookConfig(url={}, method={}, timeout={}s)",
self.url,
self.method,
self.timeout.as_secs()
)
}
}
#[derive(Debug, Clone, Serialize)]
struct WebhookPayload {
event_type: String,
download_id: Option<u64>,
timestamp: String,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<serde_json::Value>,
}
pub struct WebhookDelivery {
client: Arc<Client>,
webhooks: Arc<RwLock<Vec<WebhookConfig>>>,
tx: mpsc::Sender<(WebhookConfig, DownloadEvent)>,
}
impl WebhookDelivery {
pub fn new() -> Self {
tracing::debug!("⚙️ Creating new WebhookDelivery system");
let client = crate::utils::http::build_http_client(crate::utils::http::HttpClientConfig {
timeout: Some(Duration::from_secs(30)),
..Default::default()
})
.unwrap_or_else(|_| Arc::new(Client::new()));
let (tx, mut rx) = mpsc::channel::<(WebhookConfig, DownloadEvent)>(1024);
let webhooks = Arc::new(RwLock::new(Vec::new()));
let client_clone = client.clone();
tracing::debug!("⚙️ Spawning webhook delivery worker task");
const MAX_CONCURRENT_DELIVERIES: usize = 16;
let delivery_semaphore = Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_DELIVERIES));
tokio::spawn(async move {
tracing::debug!("⚙️ Webhook delivery worker started");
while let Some((config, event)) = rx.recv().await {
let client = client_clone.clone();
let permit = delivery_semaphore.clone().acquire_owned().await;
let Ok(permit) = permit else {
tracing::warn!("Webhook semaphore closed, stopping delivery worker");
break;
};
tokio::spawn(async move {
let _permit = permit;
Self::deliver_webhook(client, config, event).await;
});
}
tracing::debug!("⚙️ Webhook delivery worker stopped");
});
Self { client, webhooks, tx }
}
pub async fn register(&self, config: WebhookConfig) {
tracing::debug!(
url = %config.url,
method = ?config.method,
"🔔 Registering new webhook"
);
let mut webhooks = self.webhooks.write().await;
webhooks.push(config);
tracing::debug!(total_webhooks = webhooks.len(), "✅ Webhook registered");
}
pub async fn process_event(&self, event: &DownloadEvent) {
tracing::debug!(
event_type = event.event_type(),
download_id = event.download_id(),
"🔔 Processing event for webhook delivery"
);
let webhooks = self.webhooks.read().await;
let mut matched_count = 0;
for webhook in webhooks.iter() {
if webhook.filter.matches(event) {
matched_count += 1;
if let Err(e) = self.tx.try_send((webhook.clone(), event.clone())) {
tracing::warn!(error = %e, "Webhook channel full, dropping event");
}
}
}
tracing::debug!(
event_type = event.event_type(),
total_webhooks = webhooks.len(),
matched_webhooks = matched_count,
"✅ Event processed for webhook delivery"
);
}
pub async fn count(&self) -> usize {
let webhooks = self.webhooks.read().await;
webhooks.len()
}
pub async fn clear(&self) {
tracing::debug!("⚙️ Clearing all webhooks");
let mut webhooks = self.webhooks.write().await;
let count = webhooks.len();
webhooks.clear();
tracing::debug!(webhooks_cleared = count, "✅ All webhooks cleared");
}
async fn deliver_webhook(client: Arc<Client>, config: WebhookConfig, event: DownloadEvent) {
tracing::debug!(
url = %config.url,
event_type = event.event_type(),
download_id = event.download_id(),
"🔔 Starting webhook delivery"
);
let payload = WebhookPayload {
event_type: event.event_type().to_string(),
download_id: event.download_id(),
timestamp: chrono::Utc::now().to_rfc3339(),
data: if config.include_full_data {
serde_json::to_value(&event).ok()
} else {
None
},
};
let policy = config.retry_policy.clone();
let result = policy
.execute_with_condition(
|| {
let client = &client;
let config = &config;
let payload = &payload;
async move { Self::send_webhook(client, config, payload).await }
},
|e: &String| {
if e.starts_with("HTTP 4") && !e.starts_with("HTTP 429") {
return false;
}
true
},
)
.await;
match result {
Ok(_) => {
tracing::debug!(url = config.url, "✅ Webhook delivered successfully");
}
Err(e) => {
tracing::error!(url = config.url, error = %e, "Webhook delivery failed after retries");
}
}
}
async fn send_webhook(client: &Client, config: &WebhookConfig, payload: &WebhookPayload) -> Result<(), String> {
tracing::debug!(
url = %config.url,
method = ?config.method,
event_type = %payload.event_type,
"🔔 Sending webhook request"
);
let mut request = match config.method {
WebhookMethod::Post => client.post(&config.url),
WebhookMethod::Put => client.put(&config.url),
WebhookMethod::Patch => client.patch(&config.url),
};
for (key, value) in &config.headers {
request = request.header(key, value);
}
request = request.header("Content-Type", "application/json");
request = request.json(payload);
request = request.timeout(config.timeout);
let response = request.send().await.map_err(|e| format!("Request failed: {}", e))?;
if !response.status().is_success() {
let status = response.status();
tracing::warn!(
url = %config.url,
status_code = status.as_u16(),
"🔔 Webhook request failed"
);
return Err(format!("HTTP {}", status));
}
tracing::debug!(
url = %config.url,
status_code = response.status().as_u16(),
"✅ Webhook request succeeded"
);
Ok(())
}
}
impl Default for WebhookDelivery {
fn default() -> Self {
Self::new()
}
}
impl Clone for WebhookDelivery {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
webhooks: self.webhooks.clone(),
tx: self.tx.clone(),
}
}
}
impl std::fmt::Debug for WebhookDelivery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WebhookDelivery")
.field("webhooks_count", &"<async>")
.finish()
}
}
impl std::fmt::Display for WebhookDelivery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("WebhookDelivery")
}
}