Skip to main content

sockudo_webhook/
sender.rs

1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4
5#[cfg(feature = "lambda")]
6use crate::lambda_sender::LambdaWebhookSender;
7use ahash::AHashMap;
8use reqwest::{Client, header};
9use sockudo_core::token::Token;
10use sockudo_core::webhook_types::{
11    JobData, JobPayload, PusherWebhookPayload, Webhook, WebhookFilter,
12};
13use sonic_rs::Value;
14#[cfg(feature = "lambda")]
15use sonic_rs::json;
16use sonic_rs::prelude::*;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Semaphore;
20use tracing::{debug, error, info, warn};
21
22const MAX_CONCURRENT_WEBHOOKS: usize = 20;
23
24/// Parameters for creating an HTTP webhook task
25struct HttpWebhookTaskParams {
26    url: url::Url,
27    webhook_config: Webhook,
28    permit: tokio::sync::OwnedSemaphorePermit,
29    app_key: String,
30    signature: String,
31    body_to_send: String,
32}
33
34pub struct WebhookSender {
35    client: Client,
36    app_manager: Arc<dyn AppManager + Send + Sync>,
37    #[cfg(feature = "lambda")]
38    lambda_sender: LambdaWebhookSender,
39    webhook_semaphore: Arc<Semaphore>,
40}
41
42impl WebhookSender {
43    pub fn new(app_manager: Arc<dyn AppManager + Send + Sync>) -> Self {
44        let client = Client::builder()
45            .timeout(Duration::from_secs(10))
46            .build()
47            .unwrap_or_default();
48        Self {
49            client,
50            app_manager,
51            #[cfg(feature = "lambda")]
52            lambda_sender: LambdaWebhookSender::new(),
53            webhook_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_WEBHOOKS)),
54        }
55    }
56
57    async fn get_app_config(&self, app_id: &str) -> Result<App> {
58        match self.app_manager.find_by_id(app_id).await? {
59            Some(app) => Ok(app),
60            None => {
61                error!("Webhook: Failed to find app with ID: {}", app_id);
62                Err(Error::InvalidAppKey)
63            }
64        }
65    }
66
67    async fn validate_webhook_job(&self, app_id: &str, events: &[Value]) -> Result<()> {
68        if events.is_empty() {
69            warn!("Webhook job for app {} has no events.", app_id);
70            return Ok(());
71        }
72        Ok(())
73    }
74
75    fn create_pusher_payload(&self, job: &JobData) -> Result<(PusherWebhookPayload, String)> {
76        let pusher_payload = PusherWebhookPayload {
77            time_ms: job.payload.time_ms,
78            events: job.payload.events.clone(),
79        };
80
81        let body_json_string = sonic_rs::to_string(&pusher_payload)
82            .map_err(|e| Error::Serialization(format!("Failed to serialize webhook body: {e}")))?;
83
84        let _signature =
85            Token::new(job.app_key.clone(), job.app_secret.clone()).sign(&body_json_string);
86        Ok((pusher_payload, body_json_string))
87    }
88
89    fn event_matches_webhook_filter(&self, event: &Value, filter: Option<&WebhookFilter>) -> bool {
90        let Some(filter) = filter else {
91            return true;
92        };
93
94        let channel = event
95            .get("channel")
96            .and_then(Value::as_str)
97            .unwrap_or_default();
98
99        if let Some(prefix) = &filter.channel_prefix
100            && !channel.starts_with(prefix)
101        {
102            return false;
103        }
104
105        if let Some(suffix) = &filter.channel_suffix
106            && !channel.ends_with(suffix)
107        {
108            return false;
109        }
110
111        if let Some(pattern) = &filter.channel_pattern {
112            let Ok(regex) = regex::Regex::new(pattern) else {
113                warn!(
114                    "Ignoring invalid webhook channel_pattern regex: {}",
115                    pattern
116                );
117                return false;
118            };
119
120            if !regex.is_match(channel) {
121                return false;
122            }
123        }
124
125        true
126    }
127
128    fn filter_events_for_webhook(&self, events: &[Value], webhook_config: &Webhook) -> Vec<Value> {
129        events
130            .iter()
131            .filter(|event| {
132                event
133                    .get("name")
134                    .and_then(Value::as_str)
135                    .is_some_and(|event_name| {
136                        webhook_config.event_types.contains(&event_name.to_string())
137                            && self
138                                .event_matches_webhook_filter(event, webhook_config.filter.as_ref())
139                    })
140            })
141            .cloned()
142            .collect()
143    }
144
145    fn find_relevant_webhooks<'a>(
146        &self,
147        events: &[Value],
148        webhook_configs: &'a [Webhook],
149    ) -> AHashMap<String, (&'a Webhook, Vec<Value>)> {
150        let mut relevant_configs = AHashMap::new();
151
152        for wh_config in webhook_configs {
153            let filtered_events = self.filter_events_for_webhook(events, wh_config);
154            if filtered_events.is_empty() {
155                continue;
156            }
157
158            let key = wh_config
159                .url
160                .as_ref()
161                .map(|u| u.to_string())
162                .or_else(|| wh_config.lambda_function.clone())
163                .or_else(|| wh_config.lambda.as_ref().map(|l| l.function_name.clone()))
164                .unwrap_or_else(String::new);
165
166            if !key.is_empty() {
167                relevant_configs.insert(key, (wh_config, filtered_events));
168            }
169        }
170        relevant_configs
171    }
172
173    pub async fn process_webhook_job(&self, job: JobData) -> Result<()> {
174        let app_id = job.app_id.clone();
175        let app_key = job.app_key.clone();
176        debug!("Processing webhook job for app_id: {}", app_id);
177
178        let app_config = self.get_app_config(&app_id).await?;
179
180        let webhook_configs = match &app_config.webhooks {
181            Some(hooks) => hooks,
182            None => {
183                debug!("No webhooks configured for app: {}", app_id);
184                return Ok(());
185            }
186        };
187
188        self.validate_webhook_job(&app_id, &job.payload.events)
189            .await?;
190
191        let (pusher_payload, _body_json_string) = self.create_pusher_payload(&job)?;
192
193        let relevant_webhooks = self.find_relevant_webhooks(&job.payload.events, webhook_configs);
194        if relevant_webhooks.is_empty() {
195            debug!(
196                "No matching webhook configurations for events in job for app {}",
197                app_id
198            );
199            return Ok(());
200        }
201
202        log_webhook_processing_pusher_format(&app_id, &pusher_payload);
203
204        let mut tasks = Vec::new();
205        for (_endpoint_key, (webhook_config, filtered_events)) in relevant_webhooks {
206            let permit = self
207                .webhook_semaphore
208                .clone()
209                .acquire_owned()
210                .await
211                .map_err(|e| {
212                    Error::Other(format!("Failed to acquire webhook semaphore permit: {e}"))
213                })?;
214
215            let filtered_job = JobData {
216                payload: JobPayload {
217                    time_ms: job.payload.time_ms,
218                    events: filtered_events,
219                },
220                ..job.clone()
221            };
222            let (_, filtered_body_json_string) = self.create_pusher_payload(&filtered_job)?;
223            let filtered_signature = Token::new(job.app_key.clone(), job.app_secret.clone())
224                .sign(&filtered_body_json_string);
225
226            let task = self.create_webhook_task(
227                webhook_config,
228                permit,
229                app_id.clone(),
230                app_key.clone(),
231                filtered_signature,
232                filtered_body_json_string,
233            );
234            tasks.push(task);
235        }
236
237        for task_handle in tasks {
238            if let Err(e) = task_handle.await {
239                error!("Webhook task execution failed: {}", e);
240            }
241        }
242
243        Ok(())
244    }
245
246    fn create_webhook_task(
247        &self,
248        webhook_config: &Webhook,
249        permit: tokio::sync::OwnedSemaphorePermit,
250        app_id: String,
251        app_key: String,
252        signature: String,
253        body_to_send: String,
254    ) -> tokio::task::JoinHandle<()> {
255        if let Some(url) = &webhook_config.url {
256            let params = HttpWebhookTaskParams {
257                url: url.clone(),
258                webhook_config: webhook_config.clone(),
259                permit,
260                app_key,
261                signature,
262                body_to_send,
263            };
264
265            self.create_http_webhook_task(params)
266        } else if webhook_config.lambda.is_some() || webhook_config.lambda_function.is_some() {
267            #[cfg(feature = "lambda")]
268            {
269                self.create_lambda_webhook_task(webhook_config, permit, app_id, body_to_send)
270            }
271            #[cfg(not(feature = "lambda"))]
272            {
273                warn!(
274                    "Lambda webhook configured for app {} but Lambda support not compiled in.",
275                    app_id
276                );
277                drop(permit);
278                tokio::spawn(async {})
279            }
280        } else {
281            warn!(
282                "Webhook for app {} has neither URL nor Lambda config.",
283                app_id
284            );
285            drop(permit);
286            tokio::spawn(async {})
287        }
288    }
289
290    fn create_http_webhook_task(
291        &self,
292        params: HttpWebhookTaskParams,
293    ) -> tokio::task::JoinHandle<()> {
294        let client = self.client.clone();
295        let url_str = params.url.to_string();
296        let custom_headers = params
297            .webhook_config
298            .headers
299            .as_ref()
300            .map(|h| h.headers.clone())
301            .unwrap_or_default();
302
303        tokio::spawn(async move {
304            let _permit = params.permit;
305            if let Err(e) = send_pusher_webhook(
306                &client,
307                &url_str,
308                &params.app_key,
309                &params.signature,
310                params.body_to_send,
311                custom_headers,
312            )
313            .await
314            {
315                error!("Webhook send error to URL {}: {}", url_str, e);
316            } else {
317                debug!("Successfully sent Pusher webhook to URL: {}", url_str);
318            }
319        })
320    }
321
322    #[cfg(feature = "lambda")]
323    fn create_lambda_webhook_task(
324        &self,
325        webhook_config: &Webhook,
326        permit: tokio::sync::OwnedSemaphorePermit,
327        app_id: String,
328        body_to_send: String,
329    ) -> tokio::task::JoinHandle<()> {
330        let lambda_sender = self.lambda_sender.clone();
331        let webhook_clone = webhook_config.clone();
332        let payload_for_lambda: Value = sonic_rs::from_str(&body_to_send).unwrap_or(json!({}));
333
334        tokio::spawn(async move {
335            let _permit = permit;
336            if let Err(e) = lambda_sender
337                .invoke_lambda(&webhook_clone, "batch_events", &app_id, payload_for_lambda)
338                .await
339            {
340                error!("Lambda webhook error for app {}: {}", app_id, e);
341            } else {
342                debug!("Successfully invoked Lambda for app: {}", app_id);
343            }
344        })
345    }
346}
347
348impl Clone for WebhookSender {
349    fn clone(&self) -> Self {
350        Self {
351            client: self.client.clone(),
352            app_manager: self.app_manager.clone(),
353            #[cfg(feature = "lambda")]
354            lambda_sender: self.lambda_sender.clone(),
355            webhook_semaphore: self.webhook_semaphore.clone(),
356        }
357    }
358}
359
360/// Maximum total retry duration (5 minutes) per Pusher spec.
361const MAX_RETRY_DURATION: Duration = Duration::from_secs(300);
362
363/// Initial retry delay.
364const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1);
365
366/// Helper function to send a Pusher-formatted webhook with retry and exponential backoff.
367///
368/// On non-2XX responses or network errors, retries with exponential backoff
369/// for up to 5 minutes (per Pusher protocol spec).
370async fn send_pusher_webhook(
371    client: &Client,
372    url: &str,
373    app_key: &str,
374    signature: &str,
375    json_body: String,
376    custom_headers_config: AHashMap<String, String>,
377) -> Result<()> {
378    debug!("Sending Pusher webhook to URL: {}", url);
379
380    let start = tokio::time::Instant::now();
381    let mut delay = INITIAL_RETRY_DELAY;
382    let mut attempt = 0u32;
383
384    loop {
385        attempt += 1;
386        let result = send_pusher_webhook_once(
387            client,
388            url,
389            app_key,
390            signature,
391            &json_body,
392            &custom_headers_config,
393        )
394        .await;
395
396        match result {
397            Ok(()) => return Ok(()),
398            Err(e) => {
399                let elapsed = start.elapsed();
400                if elapsed + delay > MAX_RETRY_DURATION {
401                    error!(
402                        "Webhook to {} failed after {} attempts over {:.1}s, giving up: {}",
403                        url,
404                        attempt,
405                        elapsed.as_secs_f64(),
406                        e
407                    );
408                    return Err(e);
409                }
410
411                warn!(
412                    "Webhook to {} failed (attempt {}), retrying in {:.1}s: {}",
413                    url,
414                    attempt,
415                    delay.as_secs_f64(),
416                    e
417                );
418                tokio::time::sleep(delay).await;
419                delay = (delay * 2).min(Duration::from_secs(60));
420            }
421        }
422    }
423}
424
425/// Single attempt to send a Pusher webhook.
426async fn send_pusher_webhook_once(
427    client: &Client,
428    url: &str,
429    app_key: &str,
430    signature: &str,
431    json_body: &str,
432    custom_headers_config: &AHashMap<String, String>,
433) -> Result<()> {
434    let mut request_builder = client
435        .post(url)
436        .header(header::CONTENT_TYPE, "application/json")
437        .header("X-Pusher-Key", app_key)
438        .header("X-Pusher-Signature", signature);
439
440    for (key, value) in custom_headers_config {
441        request_builder = request_builder.header(key, value);
442    }
443
444    match request_builder.body(json_body.to_string()).send().await {
445        Ok(response) => {
446            let status = response.status();
447            if status.is_success() {
448                info!(
449                    "Successfully sent Pusher webhook to {} (status: {})",
450                    url, status
451                );
452                Ok(())
453            } else {
454                let error_text = response.text().await.unwrap_or_default();
455                error!(
456                    "Pusher webhook to {} failed with status {}: {}",
457                    url, status, error_text
458                );
459                Err(Error::Other(format!(
460                    "Webhook to {url} failed with status {status}"
461                )))
462            }
463        }
464        Err(e) => {
465            error!("Failed to send Pusher webhook to {}: {}", url, e);
466            Err(Error::Other(format!(
467                "HTTP request failed for webhook to {url}: {e}"
468            )))
469        }
470    }
471}
472
473fn log_webhook_processing_pusher_format(app_id: &str, payload: &PusherWebhookPayload) {
474    debug!("Pusher Webhook for app ID: {}", app_id);
475    for event in &payload.events {
476        debug!("  Event: {:?}", event);
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use sockudo_app::memory_app_manager::MemoryAppManager;
483    use sockudo_core::app::{App, AppManager};
484    use sockudo_core::webhook_types::JobPayload;
485
486    use super::*;
487
488    #[tokio::test]
489    async fn test_creating_webhook_sender() {
490        let webhook_sender = WebhookSender::new(Arc::new(MemoryAppManager::new()));
491        assert!(webhook_sender.webhook_semaphore.available_permits() > 0);
492        assert!(webhook_sender.app_manager.get_apps().await.is_ok());
493    }
494
495    #[tokio::test]
496    async fn test_process_webhook_job_no_events() {
497        let app_manager = Arc::new(MemoryAppManager::new());
498        let app = App {
499            id: "test_app".to_string(),
500            key: "test_key".to_string(),
501            secret: "test_secret".to_string(),
502            max_connections: 100,
503            enable_client_messages: true,
504            enabled: true,
505            max_client_events_per_second: 100,
506            ..Default::default()
507        };
508        app_manager.create_app(app).await.unwrap();
509        let webhook_sender = WebhookSender::new(app_manager.clone());
510
511        let job = JobData {
512            app_id: "test_app".to_string(),
513            app_key: "test_key".to_string(),
514            app_secret: "test_secret".to_string(),
515            payload: JobPayload {
516                time_ms: 1234567890,
517                events: vec![],
518            },
519            original_signature: "test_signature".to_string(),
520        };
521
522        let result = webhook_sender.process_webhook_job(job).await;
523        assert!(result.is_ok());
524    }
525
526    #[tokio::test]
527    async fn test_process_webhook_job_with_events() {
528        let app_manager = Arc::new(MemoryAppManager::new());
529        let app = App {
530            id: "test_app".to_string(),
531            key: "test_key".to_string(),
532            secret: "test_secret".to_string(),
533            max_connections: 100,
534            enable_client_messages: true,
535            enabled: true,
536            max_client_events_per_second: 100,
537            ..Default::default()
538        };
539        app_manager.create_app(app).await.unwrap();
540        let webhook_sender = WebhookSender::new(app_manager.clone());
541
542        let job = JobData {
543            app_id: "test_app".to_string(),
544            app_key: "test_key".to_string(),
545            app_secret: "test_secret".to_string(),
546            payload: JobPayload {
547                time_ms: 1234567890,
548                events: vec![sonic_rs::json!({
549                    "name": "channel_occupied",
550                    "channel": "test-channel"
551                })],
552            },
553            original_signature: "test_signature".to_string(),
554        };
555
556        let result = webhook_sender.process_webhook_job(job).await;
557        assert!(result.is_ok());
558    }
559
560    #[tokio::test]
561    async fn test_process_webhook_job_invalid_app() {
562        let app_manager = Arc::new(MemoryAppManager::new());
563        let webhook_sender = WebhookSender::new(app_manager.clone());
564
565        let job = JobData {
566            app_id: "non_existent_app".to_string(),
567            app_key: "test_key".to_string(),
568            app_secret: "test_secret".to_string(),
569            payload: JobPayload {
570                time_ms: 1234567890,
571                events: vec![],
572            },
573            original_signature: "test_signature".to_string(),
574        };
575
576        let result = webhook_sender.process_webhook_job(job).await;
577        assert!(result.is_err());
578    }
579
580    #[tokio::test]
581    async fn test_process_webhook_job_concurrent_requests() {
582        let app_manager = Arc::new(MemoryAppManager::new());
583        let app = App {
584            id: "test_app".to_string(),
585            key: "test_key".to_string(),
586            secret: "test_secret".to_string(),
587            max_connections: 100,
588            enable_client_messages: true,
589            enabled: true,
590            max_client_events_per_second: 100,
591            ..Default::default()
592        };
593        app_manager.create_app(app).await.unwrap();
594        let webhook_sender = Arc::new(WebhookSender::new(app_manager.clone()));
595
596        let mut handles = vec![];
597        for i in 0..10 {
598            let sender_clone = webhook_sender.clone();
599            let job = JobData {
600                app_id: "test_app".to_string(),
601                app_key: "test_key".to_string(),
602                app_secret: "test_secret".to_string(),
603                payload: JobPayload {
604                    time_ms: 1234567890 + i,
605                    events: vec![sonic_rs::json!({
606                        "name": "channel_occupied",
607                        "channel": format!("test-channel-{}", i)
608                    })],
609                },
610                original_signature: format!("test_signature_{i}"),
611            };
612
613            handles.push(tokio::spawn(async move {
614                sender_clone.process_webhook_job(job).await
615            }));
616        }
617
618        let results = futures::future::join_all(handles).await;
619        for result in results {
620            assert!(result.unwrap().is_ok());
621        }
622    }
623
624    #[test]
625    fn test_filter_events_for_webhook_respects_channel_prefix() {
626        let webhook_sender = WebhookSender::new(Arc::new(MemoryAppManager::new()));
627        let webhook = Webhook {
628            url: Some(url::Url::parse("http://localhost/webhook").unwrap()),
629            lambda_function: None,
630            lambda: None,
631            event_types: vec!["channel_occupied".to_string()],
632            filter: Some(WebhookFilter {
633                channel_prefix: Some("#server-to-user".to_string()),
634                channel_suffix: None,
635                channel_pattern: None,
636            }),
637            headers: None,
638        };
639
640        let filtered = webhook_sender.filter_events_for_webhook(
641            &[
642                sonic_rs::json!({
643                    "name": "channel_occupied",
644                    "channel": "#server-to-user-123"
645                }),
646                sonic_rs::json!({
647                    "name": "channel_occupied",
648                    "channel": "private-conversation.123"
649                }),
650            ],
651            &webhook,
652        );
653
654        assert_eq!(filtered.len(), 1);
655        assert_eq!(
656            filtered[0].get("channel").and_then(Value::as_str),
657            Some("#server-to-user-123")
658        );
659    }
660
661    #[test]
662    fn test_find_relevant_webhooks_splits_events_per_endpoint() {
663        let webhook_sender = WebhookSender::new(Arc::new(MemoryAppManager::new()));
664        let prefixed = Webhook {
665            url: Some(url::Url::parse("http://localhost/prefix").unwrap()),
666            lambda_function: None,
667            lambda: None,
668            event_types: vec!["channel_occupied".to_string()],
669            filter: Some(WebhookFilter {
670                channel_prefix: Some("#server-to-user".to_string()),
671                channel_suffix: None,
672                channel_pattern: None,
673            }),
674            headers: None,
675        };
676        let catch_all = Webhook {
677            url: Some(url::Url::parse("http://localhost/all").unwrap()),
678            lambda_function: None,
679            lambda: None,
680            event_types: vec!["channel_occupied".to_string()],
681            filter: None,
682            headers: None,
683        };
684
685        let webhooks = [prefixed.clone(), catch_all.clone()];
686        let relevant = webhook_sender.find_relevant_webhooks(
687            &[
688                sonic_rs::json!({
689                    "name": "channel_occupied",
690                    "channel": "#server-to-user-1"
691                }),
692                sonic_rs::json!({
693                    "name": "channel_occupied",
694                    "channel": "private-conversation.1"
695                }),
696            ],
697            &webhooks,
698        );
699
700        assert_eq!(relevant.len(), 2);
701        assert_eq!(relevant.get("http://localhost/prefix").unwrap().1.len(), 1);
702        assert_eq!(relevant.get("http://localhost/all").unwrap().1.len(), 2);
703    }
704}