1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4
5#[cfg(feature = "lambda")]
6use crate::lambda_sender::LambdaWebhookSender;
7use ahash::AHashMap;
8use reqwest::{Client, header};
9use sockudo_core::token::Token;
10use sockudo_core::webhook_types::{
11 JobData, JobPayload, PusherWebhookPayload, Webhook, WebhookFilter,
12};
13use sonic_rs::Value;
14#[cfg(feature = "lambda")]
15use sonic_rs::json;
16use sonic_rs::prelude::*;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::Semaphore;
20use tracing::{debug, error, info, warn};
21
22const MAX_CONCURRENT_WEBHOOKS: usize = 20;
23
24struct HttpWebhookTaskParams {
26 url: url::Url,
27 webhook_config: Webhook,
28 permit: tokio::sync::OwnedSemaphorePermit,
29 app_key: String,
30 signature: String,
31 body_to_send: String,
32}
33
34pub struct WebhookSender {
35 client: Client,
36 app_manager: Arc<dyn AppManager + Send + Sync>,
37 #[cfg(feature = "lambda")]
38 lambda_sender: LambdaWebhookSender,
39 webhook_semaphore: Arc<Semaphore>,
40}
41
42impl WebhookSender {
43 pub fn new(app_manager: Arc<dyn AppManager + Send + Sync>) -> Self {
44 let client = Client::builder()
45 .timeout(Duration::from_secs(10))
46 .build()
47 .unwrap_or_default();
48 Self {
49 client,
50 app_manager,
51 #[cfg(feature = "lambda")]
52 lambda_sender: LambdaWebhookSender::new(),
53 webhook_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_WEBHOOKS)),
54 }
55 }
56
57 async fn get_app_config(&self, app_id: &str) -> Result<App> {
58 match self.app_manager.find_by_id(app_id).await? {
59 Some(app) => Ok(app),
60 None => {
61 error!("Webhook: Failed to find app with ID: {}", app_id);
62 Err(Error::InvalidAppKey)
63 }
64 }
65 }
66
67 async fn validate_webhook_job(&self, app_id: &str, events: &[Value]) -> Result<()> {
68 if events.is_empty() {
69 warn!("Webhook job for app {} has no events.", app_id);
70 return Ok(());
71 }
72 Ok(())
73 }
74
75 fn create_pusher_payload(&self, job: &JobData) -> Result<(PusherWebhookPayload, String)> {
76 let pusher_payload = PusherWebhookPayload {
77 time_ms: job.payload.time_ms,
78 events: job.payload.events.clone(),
79 };
80
81 let body_json_string = sonic_rs::to_string(&pusher_payload)
82 .map_err(|e| Error::Serialization(format!("Failed to serialize webhook body: {e}")))?;
83
84 let _signature =
85 Token::new(job.app_key.clone(), job.app_secret.clone()).sign(&body_json_string);
86 Ok((pusher_payload, body_json_string))
87 }
88
89 fn event_matches_webhook_filter(&self, event: &Value, filter: Option<&WebhookFilter>) -> bool {
90 let Some(filter) = filter else {
91 return true;
92 };
93
94 let channel = event
95 .get("channel")
96 .and_then(Value::as_str)
97 .unwrap_or_default();
98
99 if let Some(prefix) = &filter.channel_prefix
100 && !channel.starts_with(prefix)
101 {
102 return false;
103 }
104
105 if let Some(suffix) = &filter.channel_suffix
106 && !channel.ends_with(suffix)
107 {
108 return false;
109 }
110
111 if let Some(pattern) = &filter.channel_pattern {
112 let Ok(regex) = regex::Regex::new(pattern) else {
113 warn!(
114 "Ignoring invalid webhook channel_pattern regex: {}",
115 pattern
116 );
117 return false;
118 };
119
120 if !regex.is_match(channel) {
121 return false;
122 }
123 }
124
125 true
126 }
127
128 fn filter_events_for_webhook(&self, events: &[Value], webhook_config: &Webhook) -> Vec<Value> {
129 events
130 .iter()
131 .filter(|event| {
132 event
133 .get("name")
134 .and_then(Value::as_str)
135 .is_some_and(|event_name| {
136 webhook_config.event_types.contains(&event_name.to_string())
137 && self
138 .event_matches_webhook_filter(event, webhook_config.filter.as_ref())
139 })
140 })
141 .cloned()
142 .collect()
143 }
144
145 fn find_relevant_webhooks<'a>(
146 &self,
147 events: &[Value],
148 webhook_configs: &'a [Webhook],
149 ) -> AHashMap<String, (&'a Webhook, Vec<Value>)> {
150 let mut relevant_configs = AHashMap::new();
151
152 for wh_config in webhook_configs {
153 let filtered_events = self.filter_events_for_webhook(events, wh_config);
154 if filtered_events.is_empty() {
155 continue;
156 }
157
158 let key = wh_config
159 .url
160 .as_ref()
161 .map(|u| u.to_string())
162 .or_else(|| wh_config.lambda_function.clone())
163 .or_else(|| wh_config.lambda.as_ref().map(|l| l.function_name.clone()))
164 .unwrap_or_else(String::new);
165
166 if !key.is_empty() {
167 relevant_configs.insert(key, (wh_config, filtered_events));
168 }
169 }
170 relevant_configs
171 }
172
173 pub async fn process_webhook_job(&self, job: JobData) -> Result<()> {
174 let app_id = job.app_id.clone();
175 let app_key = job.app_key.clone();
176 debug!("Processing webhook job for app_id: {}", app_id);
177
178 let app_config = self.get_app_config(&app_id).await?;
179
180 let webhook_configs = match &app_config.webhooks {
181 Some(hooks) => hooks,
182 None => {
183 debug!("No webhooks configured for app: {}", app_id);
184 return Ok(());
185 }
186 };
187
188 self.validate_webhook_job(&app_id, &job.payload.events)
189 .await?;
190
191 let (pusher_payload, _body_json_string) = self.create_pusher_payload(&job)?;
192
193 let relevant_webhooks = self.find_relevant_webhooks(&job.payload.events, webhook_configs);
194 if relevant_webhooks.is_empty() {
195 debug!(
196 "No matching webhook configurations for events in job for app {}",
197 app_id
198 );
199 return Ok(());
200 }
201
202 log_webhook_processing_pusher_format(&app_id, &pusher_payload);
203
204 let mut tasks = Vec::new();
205 for (_endpoint_key, (webhook_config, filtered_events)) in relevant_webhooks {
206 let permit = self
207 .webhook_semaphore
208 .clone()
209 .acquire_owned()
210 .await
211 .map_err(|e| {
212 Error::Other(format!("Failed to acquire webhook semaphore permit: {e}"))
213 })?;
214
215 let filtered_job = JobData {
216 payload: JobPayload {
217 time_ms: job.payload.time_ms,
218 events: filtered_events,
219 },
220 ..job.clone()
221 };
222 let (_, filtered_body_json_string) = self.create_pusher_payload(&filtered_job)?;
223 let filtered_signature = Token::new(job.app_key.clone(), job.app_secret.clone())
224 .sign(&filtered_body_json_string);
225
226 let task = self.create_webhook_task(
227 webhook_config,
228 permit,
229 app_id.clone(),
230 app_key.clone(),
231 filtered_signature,
232 filtered_body_json_string,
233 );
234 tasks.push(task);
235 }
236
237 for task_handle in tasks {
238 if let Err(e) = task_handle.await {
239 error!("Webhook task execution failed: {}", e);
240 }
241 }
242
243 Ok(())
244 }
245
246 fn create_webhook_task(
247 &self,
248 webhook_config: &Webhook,
249 permit: tokio::sync::OwnedSemaphorePermit,
250 app_id: String,
251 app_key: String,
252 signature: String,
253 body_to_send: String,
254 ) -> tokio::task::JoinHandle<()> {
255 if let Some(url) = &webhook_config.url {
256 let params = HttpWebhookTaskParams {
257 url: url.clone(),
258 webhook_config: webhook_config.clone(),
259 permit,
260 app_key,
261 signature,
262 body_to_send,
263 };
264
265 self.create_http_webhook_task(params)
266 } else if webhook_config.lambda.is_some() || webhook_config.lambda_function.is_some() {
267 #[cfg(feature = "lambda")]
268 {
269 self.create_lambda_webhook_task(webhook_config, permit, app_id, body_to_send)
270 }
271 #[cfg(not(feature = "lambda"))]
272 {
273 warn!(
274 "Lambda webhook configured for app {} but Lambda support not compiled in.",
275 app_id
276 );
277 drop(permit);
278 tokio::spawn(async {})
279 }
280 } else {
281 warn!(
282 "Webhook for app {} has neither URL nor Lambda config.",
283 app_id
284 );
285 drop(permit);
286 tokio::spawn(async {})
287 }
288 }
289
290 fn create_http_webhook_task(
291 &self,
292 params: HttpWebhookTaskParams,
293 ) -> tokio::task::JoinHandle<()> {
294 let client = self.client.clone();
295 let url_str = params.url.to_string();
296 let custom_headers = params
297 .webhook_config
298 .headers
299 .as_ref()
300 .map(|h| h.headers.clone())
301 .unwrap_or_default();
302
303 tokio::spawn(async move {
304 let _permit = params.permit;
305 if let Err(e) = send_pusher_webhook(
306 &client,
307 &url_str,
308 ¶ms.app_key,
309 ¶ms.signature,
310 params.body_to_send,
311 custom_headers,
312 )
313 .await
314 {
315 error!("Webhook send error to URL {}: {}", url_str, e);
316 } else {
317 debug!("Successfully sent Pusher webhook to URL: {}", url_str);
318 }
319 })
320 }
321
322 #[cfg(feature = "lambda")]
323 fn create_lambda_webhook_task(
324 &self,
325 webhook_config: &Webhook,
326 permit: tokio::sync::OwnedSemaphorePermit,
327 app_id: String,
328 body_to_send: String,
329 ) -> tokio::task::JoinHandle<()> {
330 let lambda_sender = self.lambda_sender.clone();
331 let webhook_clone = webhook_config.clone();
332 let payload_for_lambda: Value = sonic_rs::from_str(&body_to_send).unwrap_or(json!({}));
333
334 tokio::spawn(async move {
335 let _permit = permit;
336 if let Err(e) = lambda_sender
337 .invoke_lambda(&webhook_clone, "batch_events", &app_id, payload_for_lambda)
338 .await
339 {
340 error!("Lambda webhook error for app {}: {}", app_id, e);
341 } else {
342 debug!("Successfully invoked Lambda for app: {}", app_id);
343 }
344 })
345 }
346}
347
348impl Clone for WebhookSender {
349 fn clone(&self) -> Self {
350 Self {
351 client: self.client.clone(),
352 app_manager: self.app_manager.clone(),
353 #[cfg(feature = "lambda")]
354 lambda_sender: self.lambda_sender.clone(),
355 webhook_semaphore: self.webhook_semaphore.clone(),
356 }
357 }
358}
359
360const MAX_RETRY_DURATION: Duration = Duration::from_secs(300);
362
363const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1);
365
366async fn send_pusher_webhook(
371 client: &Client,
372 url: &str,
373 app_key: &str,
374 signature: &str,
375 json_body: String,
376 custom_headers_config: AHashMap<String, String>,
377) -> Result<()> {
378 debug!("Sending Pusher webhook to URL: {}", url);
379
380 let start = tokio::time::Instant::now();
381 let mut delay = INITIAL_RETRY_DELAY;
382 let mut attempt = 0u32;
383
384 loop {
385 attempt += 1;
386 let result = send_pusher_webhook_once(
387 client,
388 url,
389 app_key,
390 signature,
391 &json_body,
392 &custom_headers_config,
393 )
394 .await;
395
396 match result {
397 Ok(()) => return Ok(()),
398 Err(e) => {
399 let elapsed = start.elapsed();
400 if elapsed + delay > MAX_RETRY_DURATION {
401 error!(
402 "Webhook to {} failed after {} attempts over {:.1}s, giving up: {}",
403 url,
404 attempt,
405 elapsed.as_secs_f64(),
406 e
407 );
408 return Err(e);
409 }
410
411 warn!(
412 "Webhook to {} failed (attempt {}), retrying in {:.1}s: {}",
413 url,
414 attempt,
415 delay.as_secs_f64(),
416 e
417 );
418 tokio::time::sleep(delay).await;
419 delay = (delay * 2).min(Duration::from_secs(60));
420 }
421 }
422 }
423}
424
425async fn send_pusher_webhook_once(
427 client: &Client,
428 url: &str,
429 app_key: &str,
430 signature: &str,
431 json_body: &str,
432 custom_headers_config: &AHashMap<String, String>,
433) -> Result<()> {
434 let mut request_builder = client
435 .post(url)
436 .header(header::CONTENT_TYPE, "application/json")
437 .header("X-Pusher-Key", app_key)
438 .header("X-Pusher-Signature", signature);
439
440 for (key, value) in custom_headers_config {
441 request_builder = request_builder.header(key, value);
442 }
443
444 match request_builder.body(json_body.to_string()).send().await {
445 Ok(response) => {
446 let status = response.status();
447 if status.is_success() {
448 info!(
449 "Successfully sent Pusher webhook to {} (status: {})",
450 url, status
451 );
452 Ok(())
453 } else {
454 let error_text = response.text().await.unwrap_or_default();
455 error!(
456 "Pusher webhook to {} failed with status {}: {}",
457 url, status, error_text
458 );
459 Err(Error::Other(format!(
460 "Webhook to {url} failed with status {status}"
461 )))
462 }
463 }
464 Err(e) => {
465 error!("Failed to send Pusher webhook to {}: {}", url, e);
466 Err(Error::Other(format!(
467 "HTTP request failed for webhook to {url}: {e}"
468 )))
469 }
470 }
471}
472
473fn log_webhook_processing_pusher_format(app_id: &str, payload: &PusherWebhookPayload) {
474 debug!("Pusher Webhook for app ID: {}", app_id);
475 for event in &payload.events {
476 debug!(" Event: {:?}", event);
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use sockudo_app::memory_app_manager::MemoryAppManager;
483 use sockudo_core::app::{App, AppManager};
484 use sockudo_core::webhook_types::JobPayload;
485
486 use super::*;
487
488 #[tokio::test]
489 async fn test_creating_webhook_sender() {
490 let webhook_sender = WebhookSender::new(Arc::new(MemoryAppManager::new()));
491 assert!(webhook_sender.webhook_semaphore.available_permits() > 0);
492 assert!(webhook_sender.app_manager.get_apps().await.is_ok());
493 }
494
495 #[tokio::test]
496 async fn test_process_webhook_job_no_events() {
497 let app_manager = Arc::new(MemoryAppManager::new());
498 let app = App {
499 id: "test_app".to_string(),
500 key: "test_key".to_string(),
501 secret: "test_secret".to_string(),
502 max_connections: 100,
503 enable_client_messages: true,
504 enabled: true,
505 max_client_events_per_second: 100,
506 ..Default::default()
507 };
508 app_manager.create_app(app).await.unwrap();
509 let webhook_sender = WebhookSender::new(app_manager.clone());
510
511 let job = JobData {
512 app_id: "test_app".to_string(),
513 app_key: "test_key".to_string(),
514 app_secret: "test_secret".to_string(),
515 payload: JobPayload {
516 time_ms: 1234567890,
517 events: vec![],
518 },
519 original_signature: "test_signature".to_string(),
520 };
521
522 let result = webhook_sender.process_webhook_job(job).await;
523 assert!(result.is_ok());
524 }
525
526 #[tokio::test]
527 async fn test_process_webhook_job_with_events() {
528 let app_manager = Arc::new(MemoryAppManager::new());
529 let app = App {
530 id: "test_app".to_string(),
531 key: "test_key".to_string(),
532 secret: "test_secret".to_string(),
533 max_connections: 100,
534 enable_client_messages: true,
535 enabled: true,
536 max_client_events_per_second: 100,
537 ..Default::default()
538 };
539 app_manager.create_app(app).await.unwrap();
540 let webhook_sender = WebhookSender::new(app_manager.clone());
541
542 let job = JobData {
543 app_id: "test_app".to_string(),
544 app_key: "test_key".to_string(),
545 app_secret: "test_secret".to_string(),
546 payload: JobPayload {
547 time_ms: 1234567890,
548 events: vec![sonic_rs::json!({
549 "name": "channel_occupied",
550 "channel": "test-channel"
551 })],
552 },
553 original_signature: "test_signature".to_string(),
554 };
555
556 let result = webhook_sender.process_webhook_job(job).await;
557 assert!(result.is_ok());
558 }
559
560 #[tokio::test]
561 async fn test_process_webhook_job_invalid_app() {
562 let app_manager = Arc::new(MemoryAppManager::new());
563 let webhook_sender = WebhookSender::new(app_manager.clone());
564
565 let job = JobData {
566 app_id: "non_existent_app".to_string(),
567 app_key: "test_key".to_string(),
568 app_secret: "test_secret".to_string(),
569 payload: JobPayload {
570 time_ms: 1234567890,
571 events: vec![],
572 },
573 original_signature: "test_signature".to_string(),
574 };
575
576 let result = webhook_sender.process_webhook_job(job).await;
577 assert!(result.is_err());
578 }
579
580 #[tokio::test]
581 async fn test_process_webhook_job_concurrent_requests() {
582 let app_manager = Arc::new(MemoryAppManager::new());
583 let app = App {
584 id: "test_app".to_string(),
585 key: "test_key".to_string(),
586 secret: "test_secret".to_string(),
587 max_connections: 100,
588 enable_client_messages: true,
589 enabled: true,
590 max_client_events_per_second: 100,
591 ..Default::default()
592 };
593 app_manager.create_app(app).await.unwrap();
594 let webhook_sender = Arc::new(WebhookSender::new(app_manager.clone()));
595
596 let mut handles = vec![];
597 for i in 0..10 {
598 let sender_clone = webhook_sender.clone();
599 let job = JobData {
600 app_id: "test_app".to_string(),
601 app_key: "test_key".to_string(),
602 app_secret: "test_secret".to_string(),
603 payload: JobPayload {
604 time_ms: 1234567890 + i,
605 events: vec![sonic_rs::json!({
606 "name": "channel_occupied",
607 "channel": format!("test-channel-{}", i)
608 })],
609 },
610 original_signature: format!("test_signature_{i}"),
611 };
612
613 handles.push(tokio::spawn(async move {
614 sender_clone.process_webhook_job(job).await
615 }));
616 }
617
618 let results = futures::future::join_all(handles).await;
619 for result in results {
620 assert!(result.unwrap().is_ok());
621 }
622 }
623
624 #[test]
625 fn test_filter_events_for_webhook_respects_channel_prefix() {
626 let webhook_sender = WebhookSender::new(Arc::new(MemoryAppManager::new()));
627 let webhook = Webhook {
628 url: Some(url::Url::parse("http://localhost/webhook").unwrap()),
629 lambda_function: None,
630 lambda: None,
631 event_types: vec!["channel_occupied".to_string()],
632 filter: Some(WebhookFilter {
633 channel_prefix: Some("#server-to-user".to_string()),
634 channel_suffix: None,
635 channel_pattern: None,
636 }),
637 headers: None,
638 };
639
640 let filtered = webhook_sender.filter_events_for_webhook(
641 &[
642 sonic_rs::json!({
643 "name": "channel_occupied",
644 "channel": "#server-to-user-123"
645 }),
646 sonic_rs::json!({
647 "name": "channel_occupied",
648 "channel": "private-conversation.123"
649 }),
650 ],
651 &webhook,
652 );
653
654 assert_eq!(filtered.len(), 1);
655 assert_eq!(
656 filtered[0].get("channel").and_then(Value::as_str),
657 Some("#server-to-user-123")
658 );
659 }
660
661 #[test]
662 fn test_find_relevant_webhooks_splits_events_per_endpoint() {
663 let webhook_sender = WebhookSender::new(Arc::new(MemoryAppManager::new()));
664 let prefixed = Webhook {
665 url: Some(url::Url::parse("http://localhost/prefix").unwrap()),
666 lambda_function: None,
667 lambda: None,
668 event_types: vec!["channel_occupied".to_string()],
669 filter: Some(WebhookFilter {
670 channel_prefix: Some("#server-to-user".to_string()),
671 channel_suffix: None,
672 channel_pattern: None,
673 }),
674 headers: None,
675 };
676 let catch_all = Webhook {
677 url: Some(url::Url::parse("http://localhost/all").unwrap()),
678 lambda_function: None,
679 lambda: None,
680 event_types: vec!["channel_occupied".to_string()],
681 filter: None,
682 headers: None,
683 };
684
685 let webhooks = [prefixed.clone(), catch_all.clone()];
686 let relevant = webhook_sender.find_relevant_webhooks(
687 &[
688 sonic_rs::json!({
689 "name": "channel_occupied",
690 "channel": "#server-to-user-1"
691 }),
692 sonic_rs::json!({
693 "name": "channel_occupied",
694 "channel": "private-conversation.1"
695 }),
696 ],
697 &webhooks,
698 );
699
700 assert_eq!(relevant.len(), 2);
701 assert_eq!(relevant.get("http://localhost/prefix").unwrap().1.len(), 1);
702 assert_eq!(relevant.get("http://localhost/all").unwrap().1.len(), 2);
703 }
704}