Skip to main content

sockudo_webhook/
integration.rs

1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4use sockudo_core::queue::QueueInterface;
5use sockudo_core::webhook_types::{JobData, JobPayload, JobProcessorFnAsync};
6
7use crate::sender::WebhookSender;
8use ahash::AHashMap;
9use serde::{Deserialize, Serialize};
10use sonic_rs::{Value, json};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::time::interval;
15use tracing::{error, info, warn};
16
17/// Configuration for the webhook integration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct WebhookConfig {
20    pub enabled: bool,
21    pub batching: BatchingConfig,
22    pub process_id: String,
23    pub debug: bool,
24}
25
26impl Default for WebhookConfig {
27    fn default() -> Self {
28        Self {
29            enabled: true,
30            batching: BatchingConfig::default(),
31            process_id: uuid::Uuid::new_v4().to_string(),
32            debug: false,
33        }
34    }
35}
36
37/// Configuration for webhook batching (Sockudo's internal batching)
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BatchingConfig {
40    pub enabled: bool,
41    pub duration: u64, // in milliseconds
42    pub size: usize,
43}
44
45impl Default for BatchingConfig {
46    fn default() -> Self {
47        Self {
48            enabled: false,
49            duration: 50,
50            size: 100,
51        }
52    }
53}
54
55/// Thin wrapper around a queue driver, mirroring the main crate's QueueManager.
56/// This avoids a circular dependency while keeping the same API surface.
57pub struct QueueManager {
58    driver: Box<dyn QueueInterface>,
59}
60
61impl QueueManager {
62    pub fn new(driver: Box<dyn QueueInterface>) -> Self {
63        Self { driver }
64    }
65
66    pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
67        self.driver.add_to_queue(queue_name, data).await
68    }
69
70    pub async fn process_queue(
71        &self,
72        queue_name: &str,
73        callback: JobProcessorFnAsync,
74    ) -> Result<()> {
75        self.driver.process_queue(queue_name, callback).await
76    }
77
78    pub async fn disconnect(&self) -> Result<()> {
79        self.driver.disconnect().await
80    }
81
82    pub async fn check_health(&self) -> Result<()> {
83        self.driver.check_health().await
84    }
85}
86
87/// Webhook integration for processing events
88pub struct WebhookIntegration {
89    config: WebhookConfig,
90    batched_webhooks: Arc<Mutex<AHashMap<String, Vec<JobData>>>>,
91    queue_manager: Option<Arc<QueueManager>>,
92    app_manager: Arc<dyn AppManager + Send + Sync>,
93}
94
95impl WebhookIntegration {
96    pub async fn new(
97        config: WebhookConfig,
98        app_manager: Arc<dyn AppManager + Send + Sync>,
99        queue_manager: Option<Arc<QueueManager>>,
100    ) -> Result<Self> {
101        let mut integration = Self {
102            config,
103            batched_webhooks: Arc::new(Mutex::new(AHashMap::new())),
104            queue_manager: None,
105            app_manager,
106        };
107
108        if integration.config.enabled {
109            if let Some(qm) = queue_manager {
110                integration.setup_webhook_processor(qm).await?;
111            } else {
112                warn!(
113                    "Webhooks are enabled but no queue manager provided, webhooks will be disabled"
114                );
115                integration.config.enabled = false;
116            }
117        }
118
119        if integration.config.enabled && integration.config.batching.enabled {
120            integration.start_batching_task();
121        }
122
123        Ok(integration)
124    }
125
126    async fn setup_webhook_processor(&mut self, queue_manager: Arc<QueueManager>) -> Result<()> {
127        let webhook_sender = Arc::new(WebhookSender::new(self.app_manager.clone()));
128        let queue_name = "webhooks".to_string();
129        let sender_clone = webhook_sender.clone();
130
131        let processor: JobProcessorFnAsync = Box::new(move |job_data| {
132            let sender_for_task = sender_clone.clone();
133            Box::pin(async move {
134                info!(
135                    "{}",
136                    format!("Processing webhook job from queue: {:?}", job_data.app_id)
137                );
138                sender_for_task.process_webhook_job(job_data).await
139            })
140        });
141
142        queue_manager.process_queue(&queue_name, processor).await?;
143        self.queue_manager = Some(queue_manager);
144        Ok(())
145    }
146
147    fn start_batching_task(&self) {
148        if !self.config.batching.enabled {
149            return;
150        }
151        let queue_manager_clone = self.queue_manager.clone();
152        let batched_webhooks_clone = self.batched_webhooks.clone();
153        let batch_duration = self.config.batching.duration;
154        let batch_size = self.config.batching.size.max(1);
155
156        tokio::spawn(async move {
157            let mut interval = interval(Duration::from_millis(batch_duration));
158            loop {
159                interval.tick().await;
160                let webhooks_to_process: AHashMap<String, Vec<JobData>> = {
161                    let mut batched = batched_webhooks_clone.lock().await;
162                    std::mem::take(&mut *batched)
163                };
164
165                if webhooks_to_process.is_empty() {
166                    continue;
167                }
168                info!(
169                    "{}",
170                    format!(
171                        "Processing {} batched webhook queues (Sockudo internal batching)",
172                        webhooks_to_process.len()
173                    )
174                );
175
176                if let Some(qm) = &queue_manager_clone {
177                    for (queue_name, jobs) in webhooks_to_process {
178                        for batch in Self::merge_jobs_for_queue(jobs, batch_size) {
179                            if let Err(e) = qm.add_to_queue(&queue_name, batch).await {
180                                error!(
181                                    "{}",
182                                    format!(
183                                        "Failed to add batched job to queue {}: {}",
184                                        queue_name, e
185                                    )
186                                );
187                            }
188                        }
189                    }
190                }
191            }
192        });
193    }
194
195    pub fn is_enabled(&self) -> bool {
196        self.config.enabled
197    }
198
199    async fn add_webhook(&self, queue_name: &str, job_data: JobData) -> Result<()> {
200        if !self.is_enabled() {
201            return Ok(());
202        }
203        if self.config.batching.enabled {
204            let mut batched = self.batched_webhooks.lock().await;
205            batched
206                .entry(queue_name.to_string())
207                .or_default()
208                .push(job_data);
209        } else if let Some(qm) = &self.queue_manager {
210            qm.add_to_queue(queue_name, job_data).await?;
211        } else {
212            return Err(Error::Internal(
213                "Queue manager not initialized for webhooks".to_string(),
214            ));
215        }
216        Ok(())
217    }
218
219    fn merge_jobs_for_queue(jobs: Vec<JobData>, batch_size: usize) -> Vec<JobData> {
220        let mut merged = Vec::new();
221        let mut current: Option<JobData> = None;
222        let batch_size = batch_size.max(1);
223
224        for job in jobs {
225            for chunk in Self::split_job_by_size(job, batch_size) {
226                match current.as_mut() {
227                    Some(existing)
228                        if existing.app_id == chunk.app_id
229                            && existing.app_key == chunk.app_key
230                            && existing.app_secret == chunk.app_secret
231                            && existing.payload.events.len() + chunk.payload.events.len()
232                                <= batch_size =>
233                    {
234                        existing.payload.time_ms =
235                            existing.payload.time_ms.min(chunk.payload.time_ms);
236                        existing.payload.events.extend(chunk.payload.events);
237                    }
238                    Some(_) => {
239                        if let Some(finished) = current.take() {
240                            merged.push(finished);
241                        }
242                        current = Some(chunk);
243                    }
244                    None => current = Some(chunk),
245                }
246            }
247        }
248
249        if let Some(finished) = current {
250            merged.push(finished);
251        }
252
253        merged
254    }
255
256    fn split_job_by_size(job: JobData, batch_size: usize) -> Vec<JobData> {
257        let batch_size = batch_size.max(1);
258        let JobData {
259            app_key,
260            app_id,
261            app_secret,
262            payload,
263            original_signature,
264        } = job;
265
266        let JobPayload { time_ms, events } = payload;
267        let mut chunks = Vec::new();
268
269        for event_chunk in events.chunks(batch_size) {
270            chunks.push(JobData {
271                app_key: app_key.clone(),
272                app_id: app_id.clone(),
273                app_secret: app_secret.clone(),
274                payload: JobPayload {
275                    time_ms,
276                    events: event_chunk.to_vec(),
277                },
278                original_signature: original_signature.clone(),
279            });
280        }
281
282        if chunks.is_empty() {
283            chunks.push(JobData {
284                app_key,
285                app_id,
286                app_secret,
287                payload: JobPayload {
288                    time_ms,
289                    events: Vec::new(),
290                },
291                original_signature,
292            });
293        }
294
295        chunks
296    }
297
298    fn create_job_data(
299        &self,
300        app: &App,
301        events_payload: Vec<Value>,
302        original_signature_for_queue: &str,
303    ) -> JobData {
304        let job_payload = JobPayload {
305            time_ms: chrono::Utc::now().timestamp_millis(),
306            events: events_payload,
307        };
308        JobData {
309            app_key: app.key.clone(),
310            app_id: app.id.clone(),
311            app_secret: app.secret.clone(),
312            payload: job_payload,
313            original_signature: original_signature_for_queue.to_string(),
314        }
315    }
316
317    async fn should_send_webhook(&self, app: &App, event_type_name: &str) -> bool {
318        if !self.is_enabled() {
319            return false;
320        }
321        app.webhooks.as_ref().is_some_and(|webhooks| {
322            webhooks
323                .iter()
324                .any(|wh_config| wh_config.event_types.contains(&event_type_name.to_string()))
325        })
326    }
327
328    pub async fn send_channel_occupied(&self, app: &App, channel: &str) -> Result<()> {
329        if !self.should_send_webhook(app, "channel_occupied").await {
330            return Ok(());
331        }
332        let event_obj = json!({
333            "name": "channel_occupied",
334            "channel": channel
335        });
336        let signature = format!("{}:{}:channel_occupied", app.id, channel);
337        let job_data = self.create_job_data(app, vec![event_obj], &signature);
338
339        self.add_webhook("webhooks", job_data).await
340    }
341
342    pub async fn send_channel_vacated(&self, app: &App, channel: &str) -> Result<()> {
343        if !self.should_send_webhook(app, "channel_vacated").await {
344            return Ok(());
345        }
346        let event_obj = json!({
347            "name": "channel_vacated",
348            "channel": channel
349        });
350        let signature = format!("{}:{}:channel_vacated", app.id, channel);
351        let job_data = self.create_job_data(app, vec![event_obj], &signature);
352        self.add_webhook("webhooks", job_data).await
353    }
354
355    pub async fn send_member_added(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
356        if !self.should_send_webhook(app, "member_added").await {
357            return Ok(());
358        }
359        let event_obj = json!({
360            "name": "member_added",
361            "channel": channel,
362            "user_id": user_id
363        });
364        let signature = format!("{}:{}:{}:member_added", app.id, channel, user_id);
365        let job_data = self.create_job_data(app, vec![event_obj], &signature);
366        self.add_webhook("webhooks", job_data).await
367    }
368
369    pub async fn send_member_removed(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
370        if !self.should_send_webhook(app, "member_removed").await {
371            return Ok(());
372        }
373        let event_obj = json!({
374            "name": "member_removed",
375            "channel": channel,
376            "user_id": user_id
377        });
378        let signature = format!("{}:{}:{}:member_removed", app.id, channel, user_id);
379        let job_data = self.create_job_data(app, vec![event_obj], &signature);
380        self.add_webhook("webhooks", job_data).await
381    }
382
383    pub async fn send_client_event(
384        &self,
385        app: &App,
386        channel: &str,
387        event_name: &str,
388        event_data: Value,
389        socket_id: Option<&str>,
390        user_id: Option<&str>,
391    ) -> Result<()> {
392        if !self.should_send_webhook(app, "client_event").await {
393            return Ok(());
394        }
395
396        let mut client_event_pusher_payload = json!({
397            "name": "client_event",
398            "channel": channel,
399            "event": event_name,
400            "data": event_data,
401            "socket_id": socket_id,
402        });
403
404        if channel.starts_with("presence-")
405            && let Some(uid) = user_id
406        {
407            client_event_pusher_payload["user_id"] = json!(uid);
408        }
409
410        let signature = format!(
411            "{}:{}:{}:client_event",
412            app.id,
413            channel,
414            socket_id.unwrap_or("unknown")
415        );
416        let job_data = self.create_job_data(app, vec![client_event_pusher_payload], &signature);
417        self.add_webhook("webhooks", job_data).await
418    }
419
420    pub async fn send_cache_missed(&self, app: &App, channel: &str) -> Result<()> {
421        if !self.should_send_webhook(app, "cache_miss").await {
422            return Ok(());
423        }
424        let event_obj = json!({
425            "name": "cache_miss",
426            "channel": channel,
427            "data" : "{}"
428        });
429        let signature = format!("{}:{}:cache_miss", app.id, channel);
430        let job_data = self.create_job_data(app, vec![event_obj], &signature);
431        self.add_webhook("webhooks", job_data).await
432    }
433
434    /// Sends a webhook when the subscription count for a channel changes.
435    pub async fn send_subscription_count_changed(
436        &self,
437        app: &App,
438        channel: &str,
439        subscription_count: usize,
440    ) -> Result<()> {
441        if !self.should_send_webhook(app, "subscription_count").await {
442            return Ok(());
443        }
444
445        let event_obj = json!({
446            "name": "subscription_count",
447            "channel": channel,
448            "subscription_count": subscription_count
449        });
450
451        let signature = format!(
452            "{}:{}:subscription_count:{}",
453            app.id, channel, subscription_count
454        );
455
456        let job_data = self.create_job_data(app, vec![event_obj], &signature);
457        self.add_webhook("webhooks", job_data).await
458    }
459
460    /// Check the health of the queue manager used by webhook integration
461    pub async fn check_queue_health(&self) -> Result<()> {
462        if let Some(qm) = &self.queue_manager {
463            qm.check_health().await
464        } else {
465            Ok(())
466        }
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use sockudo_app::memory_app_manager::MemoryAppManager;
474    use sockudo_core::webhook_types::{JobData, JobPayload};
475    use sockudo_queue::manager::QueueManagerFactory;
476
477    async fn create_test_queue_manager() -> Arc<QueueManager> {
478        let driver = QueueManagerFactory::create("memory", None, None, None)
479            .await
480            .expect("Failed to create test queue manager");
481        Arc::new(QueueManager::new(driver))
482    }
483
484    #[tokio::test]
485    async fn test_send_cache_missed() {
486        let app = App {
487            id: "test_app".to_string(),
488            key: "test_key".to_string(),
489            secret: "test_secret".to_string(),
490            max_connections: 100,
491            enable_client_messages: true,
492            enabled: true,
493            max_client_events_per_second: 100,
494            ..Default::default()
495        };
496        let app_manager = Arc::new(MemoryAppManager::new());
497        let config = WebhookConfig {
498            ..Default::default()
499        };
500        let queue_manager = create_test_queue_manager().await;
501        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
502            .await
503            .unwrap();
504
505        let result = integration.send_cache_missed(&app, "test_channel").await;
506        assert!(result.is_ok());
507    }
508
509    #[tokio::test]
510    async fn test_send_subscription_count_changed() {
511        let app = App {
512            id: "test_app".to_string(),
513            key: "test_key".to_string(),
514            secret: "test_secret".to_string(),
515            max_connections: 100,
516            enable_client_messages: true,
517            enabled: true,
518            max_client_events_per_second: 100,
519            ..Default::default()
520        };
521        let app_manager = Arc::new(MemoryAppManager::new());
522        let config = WebhookConfig {
523            ..Default::default()
524        };
525        let queue_manager = create_test_queue_manager().await;
526        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
527            .await
528            .unwrap();
529
530        let result = integration
531            .send_subscription_count_changed(&app, "test_channel", 5)
532            .await;
533        assert!(result.is_ok());
534
535        let config = WebhookConfig {
536            enabled: true,
537            ..Default::default()
538        };
539        let queue_manager = create_test_queue_manager().await;
540        let integration = WebhookIntegration::new(config, app_manager, Some(queue_manager))
541            .await
542            .unwrap();
543
544        let result = integration
545            .send_subscription_count_changed(&app, "test_channel", 5)
546            .await;
547        assert!(result.is_ok());
548    }
549
550    #[test]
551    fn test_webhook_config_serialization() {
552        let config = WebhookConfig {
553            enabled: true,
554            batching: BatchingConfig {
555                enabled: true,
556                duration: 1000,
557                size: 50,
558            },
559            process_id: "test-process".to_string(),
560            debug: false,
561        };
562
563        let serialized = sonic_rs::to_string(&config).unwrap();
564        let deserialized: WebhookConfig = sonic_rs::from_str(&serialized).unwrap();
565
566        assert_eq!(config.enabled, deserialized.enabled);
567        assert_eq!(config.batching.enabled, deserialized.batching.enabled);
568        assert_eq!(config.batching.duration, deserialized.batching.duration);
569        assert_eq!(config.batching.size, deserialized.batching.size);
570    }
571
572    #[tokio::test]
573    async fn test_webhook_integration_new() {
574        let app_manager = Arc::new(MemoryAppManager::new());
575        let config = WebhookConfig {
576            ..Default::default()
577        };
578
579        let queue_manager = create_test_queue_manager().await;
580        let integration = WebhookIntegration::new(config, app_manager, Some(queue_manager)).await;
581        assert!(integration.is_ok());
582    }
583
584    #[tokio::test]
585    async fn test_webhook_integration_send_event() {
586        let app = App {
587            id: "test_app".to_string(),
588            key: "test_key".to_string(),
589            secret: "test_secret".to_string(),
590            max_connections: 100,
591            enable_client_messages: true,
592            enabled: true,
593            max_client_events_per_second: 100,
594            ..Default::default()
595        };
596        let app_manager = Arc::new(MemoryAppManager::new());
597        let config = WebhookConfig {
598            ..Default::default()
599        };
600        let queue_manager = create_test_queue_manager().await;
601        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
602            .await
603            .unwrap();
604
605        let result = integration
606            .send_client_event(
607                &app,
608                "test_channel",
609                "test_event",
610                json!("test_data"),
611                None,
612                None,
613            )
614            .await;
615        assert!(result.is_ok());
616    }
617
618    #[tokio::test]
619    async fn test_webhook_integration_send_client_event() {
620        let app = App {
621            id: "test_app".to_string(),
622            key: "test_key".to_string(),
623            secret: "test_secret".to_string(),
624            max_connections: 100,
625            enable_client_messages: true,
626            enabled: true,
627            max_client_events_per_second: 100,
628            ..Default::default()
629        };
630        let app_manager = Arc::new(MemoryAppManager::new());
631        let config = WebhookConfig {
632            ..Default::default()
633        };
634        let queue_manager = create_test_queue_manager().await;
635        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
636            .await
637            .unwrap();
638
639        let result = integration
640            .send_client_event(
641                &app,
642                "test_channel",
643                "test_event",
644                json!("test_data"),
645                None,
646                None,
647            )
648            .await;
649        assert!(result.is_ok());
650    }
651
652    #[tokio::test]
653    async fn test_webhook_integration_send_member_added() {
654        let app = App {
655            id: "test_app".to_string(),
656            key: "test_key".to_string(),
657            secret: "test_secret".to_string(),
658            max_connections: 100,
659            enable_client_messages: true,
660            enabled: true,
661            max_client_events_per_second: 100,
662            ..Default::default()
663        };
664        let app_manager = Arc::new(MemoryAppManager::new());
665        let config = WebhookConfig {
666            ..Default::default()
667        };
668        let queue_manager = create_test_queue_manager().await;
669        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
670            .await
671            .unwrap();
672
673        let result = integration
674            .send_member_added(&app, "test_channel", "test_user")
675            .await;
676        assert!(result.is_ok());
677    }
678
679    #[tokio::test]
680    async fn test_webhook_integration_send_member_removed() {
681        let app = App {
682            id: "test_app".to_string(),
683            key: "test_key".to_string(),
684            secret: "test_secret".to_string(),
685            max_connections: 100,
686            enable_client_messages: true,
687            enabled: true,
688            max_client_events_per_second: 100,
689            ..Default::default()
690        };
691        let app_manager = Arc::new(MemoryAppManager::new());
692        let config = WebhookConfig {
693            ..Default::default()
694        };
695        let queue_manager = create_test_queue_manager().await;
696        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
697            .await
698            .unwrap();
699
700        let result = integration
701            .send_member_removed(&app, "test_channel", "test_user")
702            .await;
703        assert!(result.is_ok());
704    }
705
706    #[tokio::test]
707    async fn test_webhook_integration_send_channel_occupied() {
708        let app = App {
709            id: "test_app".to_string(),
710            key: "test_key".to_string(),
711            secret: "test_secret".to_string(),
712            max_connections: 100,
713            enable_client_messages: true,
714            enabled: true,
715            max_client_events_per_second: 100,
716            ..Default::default()
717        };
718        let app_manager = Arc::new(MemoryAppManager::new());
719        let config = WebhookConfig {
720            ..Default::default()
721        };
722        let queue_manager = create_test_queue_manager().await;
723        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
724            .await
725            .unwrap();
726
727        let result = integration
728            .send_channel_occupied(&app, "test_channel")
729            .await;
730        assert!(result.is_ok());
731    }
732
733    #[tokio::test]
734    async fn test_webhook_integration_send_channel_vacated() {
735        let app = App {
736            id: "test_app".to_string(),
737            key: "test_key".to_string(),
738            secret: "test_secret".to_string(),
739            max_connections: 100,
740            enable_client_messages: true,
741            enabled: true,
742            max_client_events_per_second: 100,
743            ..Default::default()
744        };
745        let app_manager = Arc::new(MemoryAppManager::new());
746        let config = WebhookConfig {
747            ..Default::default()
748        };
749        let queue_manager = create_test_queue_manager().await;
750        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
751            .await
752            .unwrap();
753
754        let result = integration.send_channel_vacated(&app, "test_channel").await;
755        assert!(result.is_ok());
756    }
757
758    #[tokio::test]
759    async fn test_webhook_integration_send_subscription_count_changed() {
760        let app = App {
761            id: "test_app".to_string(),
762            key: "test_key".to_string(),
763            secret: "test_secret".to_string(),
764            max_connections: 100,
765            enable_client_messages: true,
766            enabled: true,
767            max_client_events_per_second: 100,
768            ..Default::default()
769        };
770        let app_manager = Arc::new(MemoryAppManager::new());
771        let config = WebhookConfig {
772            ..Default::default()
773        };
774        let queue_manager = create_test_queue_manager().await;
775        let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
776            .await
777            .unwrap();
778
779        let result = integration
780            .send_subscription_count_changed(&app, "test_channel", 5)
781            .await;
782        assert!(result.is_ok());
783    }
784
785    #[test]
786    fn test_merge_jobs_for_queue_batches_by_app_and_size() {
787        let jobs = vec![
788            JobData {
789                app_key: "key-a".to_string(),
790                app_id: "app-a".to_string(),
791                app_secret: "secret-a".to_string(),
792                payload: JobPayload {
793                    time_ms: 10,
794                    events: vec![json!({"name": "channel_occupied", "channel": "one"})],
795                },
796                original_signature: "sig-1".to_string(),
797            },
798            JobData {
799                app_key: "key-a".to_string(),
800                app_id: "app-a".to_string(),
801                app_secret: "secret-a".to_string(),
802                payload: JobPayload {
803                    time_ms: 20,
804                    events: vec![json!({"name": "channel_vacated", "channel": "two"})],
805                },
806                original_signature: "sig-2".to_string(),
807            },
808            JobData {
809                app_key: "key-b".to_string(),
810                app_id: "app-b".to_string(),
811                app_secret: "secret-b".to_string(),
812                payload: JobPayload {
813                    time_ms: 30,
814                    events: vec![json!({"name": "channel_occupied", "channel": "three"})],
815                },
816                original_signature: "sig-3".to_string(),
817            },
818        ];
819
820        let merged = WebhookIntegration::merge_jobs_for_queue(jobs, 2);
821
822        assert_eq!(merged.len(), 2);
823        assert_eq!(merged[0].app_id, "app-a");
824        assert_eq!(merged[0].payload.events.len(), 2);
825        assert_eq!(merged[1].app_id, "app-b");
826        assert_eq!(merged[1].payload.events.len(), 1);
827    }
828
829    #[test]
830    fn test_merge_jobs_for_queue_splits_oversized_jobs() {
831        let job = JobData {
832            app_key: "key-a".to_string(),
833            app_id: "app-a".to_string(),
834            app_secret: "secret-a".to_string(),
835            payload: JobPayload {
836                time_ms: 10,
837                events: vec![
838                    json!({"name": "channel_occupied", "channel": "one"}),
839                    json!({"name": "channel_occupied", "channel": "two"}),
840                    json!({"name": "channel_occupied", "channel": "three"}),
841                ],
842            },
843            original_signature: "sig-1".to_string(),
844        };
845
846        let merged = WebhookIntegration::merge_jobs_for_queue(vec![job], 2);
847
848        assert_eq!(merged.len(), 2);
849        assert_eq!(merged[0].payload.events.len(), 2);
850        assert_eq!(merged[1].payload.events.len(), 1);
851    }
852}