Skip to main content

sockudo_webhook/
sender.rs

1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4
5#[cfg(feature = "lambda")]
6use crate::lambda_sender::LambdaWebhookSender;
7use ahash::AHashMap;
8use reqwest::{Client, header};
9use sockudo_core::token::Token;
10use sockudo_core::webhook_types::{
11    JobData, JobPayload, PusherWebhookPayload, Webhook, WebhookFilter,
12};
13use sonic_rs::Value;
14#[cfg(feature = "lambda")]
15use sonic_rs::json;
16use sonic_rs::prelude::*;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Semaphore;
20use tracing::{debug, error, info, warn};
21
22const MAX_CONCURRENT_WEBHOOKS: usize = 20;
23
24/// Parameters for creating an HTTP webhook task
25struct HttpWebhookTaskParams {
26    url: url::Url,
27    webhook_config: Webhook,
28    permit: tokio::sync::OwnedSemaphorePermit,
29    app_key: String,
30    signature: String,
31    body_to_send: String,
32}
33
34pub struct WebhookSender {
35    client: Client,
36    app_manager: Arc<dyn AppManager + Send + Sync>,
37    #[cfg(feature = "lambda")]
38    lambda_sender: LambdaWebhookSender,
39    webhook_semaphore: Arc<Semaphore>,
40}
41
42impl WebhookSender {
43    pub fn new(app_manager: Arc<dyn AppManager + Send + Sync>) -> Self {
44        let client = Client::builder()
45            .timeout(Duration::from_secs(10))
46            .build()
47            .unwrap_or_default();
48        Self {
49            client,
50            app_manager,
51            #[cfg(feature = "lambda")]
52            lambda_sender: LambdaWebhookSender::new(),
53            webhook_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_WEBHOOKS)),
54        }
55    }
56
57    async fn get_app_config(&self, app_id: &str) -> Result<App> {
58        match self.app_manager.find_by_id(app_id).await? {
59            Some(app) => Ok(app),
60            None => {
61                error!("Webhook: Failed to find app with ID: {}", app_id);
62                Err(Error::InvalidAppKey)
63            }
64        }
65    }
66
67    async fn validate_webhook_job(&self, app_id: &str, events: &[Value]) -> Result<()> {
68        if events.is_empty() {
69            warn!("Webhook job for app {} has no events.", app_id);
70            return Ok(());
71        }
72        Ok(())
73    }
74
75    fn create_pusher_payload(&self, job: &JobData) -> Result<(PusherWebhookPayload, String)> {
76        let pusher_payload = PusherWebhookPayload {
77            time_ms: job.payload.time_ms,
78            events: job.payload.events.clone(),
79        };
80
81        let body_json_string = sonic_rs::to_string(&pusher_payload)
82            .map_err(|e| Error::Serialization(format!("Failed to serialize webhook body: {e}")))?;
83
84        let _signature =
85            Token::new(job.app_key.clone(), job.app_secret.clone()).sign(&body_json_string);
86        Ok((pusher_payload, body_json_string))
87    }
88
89    fn event_matches_webhook_filter(&self, event: &Value, filter: Option<&WebhookFilter>) -> bool {
90        let Some(filter) = filter else {
91            return true;
92        };
93
94        let channel = event
95            .get("channel")
96            .and_then(Value::as_str)
97            .unwrap_or_default();
98
99        if let Some(prefix) = &filter.channel_prefix
100            && !channel.starts_with(prefix)
101        {
102            return false;
103        }
104
105        if let Some(suffix) = &filter.channel_suffix
106            && !channel.ends_with(suffix)
107        {
108            return false;
109        }
110
111        if let Some(pattern) = &filter.channel_pattern {
112            let Ok(regex) = regex::Regex::new(pattern) else {
113                warn!(
114                    "Ignoring invalid webhook channel_pattern regex: {}",
115                    pattern
116                );
117                return false;
118            };
119
120            if !regex.is_match(channel) {
121                return false;
122            }
123        }
124
125        true
126    }
127
128    fn filter_events_for_webhook(&self, events: &[Value], webhook_config: &Webhook) -> Vec<Value> {
129        events
130            .iter()
131            .filter(|event| {
132                event
133                    .get("name")
134                    .and_then(Value::as_str)
135                    .is_some_and(|event_name| {
136                        webhook_config.event_types.contains(&event_name.to_string())
137                            && self
138                                .event_matches_webhook_filter(event, webhook_config.filter.as_ref())
139                    })
140            })
141            .cloned()
142            .collect()
143    }
144
145    fn find_relevant_webhooks<'a>(
146        &self,
147        events: &[Value],
148        webhook_configs: &'a [Webhook],
149    ) -> AHashMap<String, (&'a Webhook, Vec<Value>)> {
150        let mut relevant_configs = AHashMap::new();
151
152        for wh_config in webhook_configs {
153            let filtered_events = self.filter_events_for_webhook(events, wh_config);
154            if filtered_events.is_empty() {
155                continue;
156            }
157
158            let key = wh_config
159                .url
160                .as_ref()
161                .map(|u| u.to_string())
162                .or_else(|| wh_config.lambda_function.clone())
163                .or_else(|| wh_config.lambda.as_ref().map(|l| l.function_name.clone()))
164                .unwrap_or_else(String::new);
165
166            if !key.is_empty() {
167                relevant_configs.insert(key, (wh_config, filtered_events));
168            }
169        }
170        relevant_configs
171    }
172
173    pub async fn process_webhook_job(&self, job: JobData) -> Result<()> {
174        let app_id = job.app_id.clone();
175        let app_key = job.app_key.clone();
176        debug!("Processing webhook job for app_id: {}", app_id);
177
178        let app_config = self.get_app_config(&app_id).await?;
179
180        let webhook_configs = match &app_config.webhooks {
181            Some(hooks) => hooks,
182            None => {
183                debug!("No webhooks configured for app: {}", app_id);
184                return Ok(());
185            }
186        };
187
188        self.validate_webhook_job(&app_id, &job.payload.events)
189            .await?;
190
191        let (pusher_payload, _body_json_string) = self.create_pusher_payload(&job)?;
192
193        let relevant_webhooks = self.find_relevant_webhooks(&job.payload.events, webhook_configs);
194        if relevant_webhooks.is_empty() {
195            debug!(
196                "No matching webhook configurations for events in job for app {}",
197                app_id
198            );
199            return Ok(());
200        }
201
202        log_webhook_processing_pusher_format(&app_id, &pusher_payload);
203
204        let mut tasks = Vec::new();
205        for (_endpoint_key, (webhook_config, filtered_events)) in relevant_webhooks {
206            let permit = self
207                .webhook_semaphore
208                .clone()
209                .acquire_owned()
210                .await
211                .map_err(|e| {
212                    Error::Other(format!("Failed to acquire webhook semaphore permit: {e}"))
213                })?;
214
215            let filtered_job = JobData {
216                payload: JobPayload {
217                    time_ms: job.payload.time_ms,
218                    events: filtered_events,
219                },
220                ..job.clone()
221            };
222            let (_, filtered_body_json_string) = self.create_pusher_payload(&filtered_job)?;
223            let filtered_signature = Token::new(job.app_key.clone(), job.app_secret.clone())
224                .sign(&filtered_body_json_string);
225
226            let task = self.create_webhook_task(
227                webhook_config,
228                permit,
229                app_id.clone(),
230                app_key.clone(),
231                filtered_signature,
232                filtered_body_json_string,
233            );
234            tasks.push(task);
235        }
236
237        for task_handle in tasks {
238            if let Err(e) = task_handle.await {
239                error!("Webhook task execution failed: {}", e);
240            }
241        }
242
243        Ok(())
244    }
245
246    fn create_webhook_task(
247        &self,
248        webhook_config: &Webhook,
249        permit: tokio::sync::OwnedSemaphorePermit,
250        app_id: String,
251        app_key: String,
252        signature: String,
253        body_to_send: String,
254    ) -> tokio::task::JoinHandle<()> {
255        if let Some(url) = &webhook_config.url {
256            let params = HttpWebhookTaskParams {
257                url: url.clone(),
258                webhook_config: webhook_config.clone(),
259                permit,
260                app_key,
261                signature,
262                body_to_send,
263            };
264
265            self.create_http_webhook_task(params)
266        } else if webhook_config.lambda.is_some() || webhook_config.lambda_function.is_some() {
267            #[cfg(feature = "lambda")]
268            {
269                self.create_lambda_webhook_task(webhook_config, permit, app_id, body_to_send)
270            }
271            #[cfg(not(feature = "lambda"))]
272            {
273                warn!(
274                    "Lambda webhook configured for app {} but Lambda support not compiled in.",
275                    app_id
276                );
277                drop(permit);
278                tokio::spawn(async {})
279            }
280        } else {
281            warn!(
282                "Webhook for app {} has neither URL nor Lambda config.",
283                app_id
284            );
285            drop(permit);
286            tokio::spawn(async {})
287        }
288    }
289
290    fn create_http_webhook_task(
291        &self,
292        params: HttpWebhookTaskParams,
293    ) -> tokio::task::JoinHandle<()> {
294        let client = self.client.clone();
295        let url_str = params.url.to_string();
296        let custom_headers = params
297            .webhook_config
298            .headers
299            .as_ref()
300            .map(|h| h.headers.clone())
301            .unwrap_or_default();
302
303        tokio::spawn(async move {
304            let _permit = params.permit;
305            if let Err(e) = send_pusher_webhook(
306                &client,
307                &url_str,
308                &params.app_key,
309                &params.signature,
310                params.body_to_send,
311                custom_headers,
312            )
313            .await
314            {
315                error!("Webhook send error to URL {}: {}", url_str, e);
316            } else {
317                debug!("Successfully sent Pusher webhook to URL: {}", url_str);
318            }
319        })
320    }
321
322    #[cfg(feature = "lambda")]
323    fn create_lambda_webhook_task(
324        &self,
325        webhook_config: &Webhook,
326        permit: tokio::sync::OwnedSemaphorePermit,
327        app_id: String,
328        body_to_send: String,
329    ) -> tokio::task::JoinHandle<()> {
330        let lambda_sender = self.lambda_sender.clone();
331        let webhook_clone = webhook_config.clone();
332        let payload_for_lambda: Value = sonic_rs::from_str(&body_to_send).unwrap_or(json!({}));
333
334        tokio::spawn(async move {
335            let _permit = permit;
336            if let Err(e) = lambda_sender
337                .invoke_lambda(&webhook_clone, "batch_events", &app_id, payload_for_lambda)
338                .await
339            {
340                error!("Lambda webhook error for app {}: {}", app_id, e);
341            } else {
342                debug!("Successfully invoked Lambda for app: {}", app_id);
343            }
344        })
345    }
346}
347
348impl Clone for WebhookSender {
349    fn clone(&self) -> Self {
350        Self {
351            client: self.client.clone(),
352            app_manager: self.app_manager.clone(),
353            #[cfg(feature = "lambda")]
354            lambda_sender: self.lambda_sender.clone(),
355            webhook_semaphore: self.webhook_semaphore.clone(),
356        }
357    }
358}
359
360/// Maximum total retry duration (5 minutes) per Pusher spec.
361const MAX_RETRY_DURATION: Duration = Duration::from_secs(300);
362
363/// Initial retry delay.
364const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1);
365
366/// Helper function to send a Pusher-formatted webhook with retry and exponential backoff.
367///
368/// On non-2XX responses or network errors, retries with exponential backoff
369/// for up to 5 minutes (per Pusher protocol spec).
370async fn send_pusher_webhook(
371    client: &Client,
372    url: &str,
373    app_key: &str,
374    signature: &str,
375    json_body: String,
376    custom_headers_config: AHashMap<String, String>,
377) -> Result<()> {
378    debug!("Sending Pusher webhook to URL: {}", url);
379
380    let start = tokio::time::Instant::now();
381    let mut delay = INITIAL_RETRY_DELAY;
382    let mut attempt = 0u32;
383
384    loop {
385        attempt += 1;
386        let result = send_pusher_webhook_once(
387            client,
388            url,
389            app_key,
390            signature,
391            &json_body,
392            &custom_headers_config,
393        )
394        .await;
395
396        match result {
397            Ok(()) => return Ok(()),
398            Err(e) => {
399                let elapsed = start.elapsed();
400                if elapsed + delay > MAX_RETRY_DURATION {
401                    error!(
402                        "Webhook to {} failed after {} attempts over {:.1}s, giving up: {}",
403                        url,
404                        attempt,
405                        elapsed.as_secs_f64(),
406                        e
407                    );
408                    return Err(e);
409                }
410
411                warn!(
412                    "Webhook to {} failed (attempt {}), retrying in {:.1}s: {}",
413                    url,
414                    attempt,
415                    delay.as_secs_f64(),
416                    e
417                );
418                tokio::time::sleep(delay).await;
419                delay = (delay * 2).min(Duration::from_secs(60));
420            }
421        }
422    }
423}
424
425/// Single attempt to send a Pusher webhook.
426async fn send_pusher_webhook_once(
427    client: &Client,
428    url: &str,
429    app_key: &str,
430    signature: &str,
431    json_body: &str,
432    custom_headers_config: &AHashMap<String, String>,
433) -> Result<()> {
434    let mut request_builder = client
435        .post(url)
436        .header(header::CONTENT_TYPE, "application/json")
437        .header("X-Pusher-Key", app_key)
438        .header("X-Pusher-Signature", signature);
439
440    for (key, value) in custom_headers_config {
441        request_builder = request_builder.header(key, value);
442    }
443
444    match request_builder.body(json_body.to_string()).send().await {
445        Ok(response) => {
446            let status = response.status();
447            if status.is_success() {
448                info!(
449                    "Successfully sent Pusher webhook to {} (status: {})",
450                    url, status
451                );
452                Ok(())
453            } else {
454                let error_text = response.text().await.unwrap_or_default();
455                error!(
456                    "Pusher webhook to {} failed with status {}: {}",
457                    url, status, error_text
458                );
459                Err(Error::Other(format!(
460                    "Webhook to {url} failed with status {status}"
461                )))
462            }
463        }
464        Err(e) => {
465            error!("Failed to send Pusher webhook to {}: {}", url, e);
466            Err(Error::Other(format!(
467                "HTTP request failed for webhook to {url}: {e}"
468            )))
469        }
470    }
471}
472
473fn log_webhook_processing_pusher_format(app_id: &str, payload: &PusherWebhookPayload) {
474    debug!("Pusher Webhook for app ID: {}", app_id);
475    for event in &payload.events {
476        debug!("  Event: {:?}", event);
477    }
478}