1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4use sockudo_core::queue::QueueInterface;
5use sockudo_core::webhook_types::{JobData, JobPayload, JobProcessorFnAsync};
6
7use crate::sender::WebhookSender;
8use ahash::AHashMap;
9use serde::{Deserialize, Serialize};
10use sonic_rs::{Value, json};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::time::interval;
15use tracing::{error, info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct WebhookConfig {
20 pub enabled: bool,
21 pub batching: BatchingConfig,
22 pub process_id: String,
23 pub debug: bool,
24}
25
26impl Default for WebhookConfig {
27 fn default() -> Self {
28 Self {
29 enabled: true,
30 batching: BatchingConfig::default(),
31 process_id: uuid::Uuid::new_v4().to_string(),
32 debug: false,
33 }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BatchingConfig {
40 pub enabled: bool,
41 pub duration: u64, pub size: usize,
43}
44
45impl Default for BatchingConfig {
46 fn default() -> Self {
47 Self {
48 enabled: false,
49 duration: 50,
50 size: 100,
51 }
52 }
53}
54
55pub struct QueueManager {
58 driver: Box<dyn QueueInterface>,
59}
60
61impl QueueManager {
62 pub fn new(driver: Box<dyn QueueInterface>) -> Self {
63 Self { driver }
64 }
65
66 pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
67 self.driver.add_to_queue(queue_name, data).await
68 }
69
70 pub async fn process_queue(
71 &self,
72 queue_name: &str,
73 callback: JobProcessorFnAsync,
74 ) -> Result<()> {
75 self.driver.process_queue(queue_name, callback).await
76 }
77
78 pub async fn disconnect(&self) -> Result<()> {
79 self.driver.disconnect().await
80 }
81
82 pub async fn check_health(&self) -> Result<()> {
83 self.driver.check_health().await
84 }
85}
86
87pub struct WebhookIntegration {
89 config: WebhookConfig,
90 batched_webhooks: Arc<Mutex<AHashMap<String, Vec<JobData>>>>,
91 queue_manager: Option<Arc<QueueManager>>,
92 app_manager: Arc<dyn AppManager + Send + Sync>,
93}
94
95impl WebhookIntegration {
96 pub async fn new(
97 config: WebhookConfig,
98 app_manager: Arc<dyn AppManager + Send + Sync>,
99 queue_manager: Option<Arc<QueueManager>>,
100 ) -> Result<Self> {
101 let mut integration = Self {
102 config,
103 batched_webhooks: Arc::new(Mutex::new(AHashMap::new())),
104 queue_manager: None,
105 app_manager,
106 };
107
108 if integration.config.enabled {
109 if let Some(qm) = queue_manager {
110 integration.setup_webhook_processor(qm).await?;
111 } else {
112 warn!(
113 "Webhooks are enabled but no queue manager provided, webhooks will be disabled"
114 );
115 integration.config.enabled = false;
116 }
117 }
118
119 if integration.config.enabled && integration.config.batching.enabled {
120 integration.start_batching_task();
121 }
122
123 Ok(integration)
124 }
125
126 async fn setup_webhook_processor(&mut self, queue_manager: Arc<QueueManager>) -> Result<()> {
127 let webhook_sender = Arc::new(WebhookSender::new(self.app_manager.clone()));
128 let queue_name = "webhooks".to_string();
129 let sender_clone = webhook_sender.clone();
130
131 let processor: JobProcessorFnAsync = Box::new(move |job_data| {
132 let sender_for_task = sender_clone.clone();
133 Box::pin(async move {
134 info!(
135 "{}",
136 format!("Processing webhook job from queue: {:?}", job_data.app_id)
137 );
138 sender_for_task.process_webhook_job(job_data).await
139 })
140 });
141
142 queue_manager.process_queue(&queue_name, processor).await?;
143 self.queue_manager = Some(queue_manager);
144 Ok(())
145 }
146
147 fn start_batching_task(&self) {
148 if !self.config.batching.enabled {
149 return;
150 }
151 let queue_manager_clone = self.queue_manager.clone();
152 let batched_webhooks_clone = self.batched_webhooks.clone();
153 let batch_duration = self.config.batching.duration;
154 let batch_size = self.config.batching.size.max(1);
155
156 tokio::spawn(async move {
157 let mut interval = interval(Duration::from_millis(batch_duration));
158 loop {
159 interval.tick().await;
160 let webhooks_to_process: AHashMap<String, Vec<JobData>> = {
161 let mut batched = batched_webhooks_clone.lock().await;
162 std::mem::take(&mut *batched)
163 };
164
165 if webhooks_to_process.is_empty() {
166 continue;
167 }
168 info!(
169 "{}",
170 format!(
171 "Processing {} batched webhook queues (Sockudo internal batching)",
172 webhooks_to_process.len()
173 )
174 );
175
176 if let Some(qm) = &queue_manager_clone {
177 for (queue_name, jobs) in webhooks_to_process {
178 for batch in Self::merge_jobs_for_queue(jobs, batch_size) {
179 if let Err(e) = qm.add_to_queue(&queue_name, batch).await {
180 error!(
181 "{}",
182 format!(
183 "Failed to add batched job to queue {}: {}",
184 queue_name, e
185 )
186 );
187 }
188 }
189 }
190 }
191 }
192 });
193 }
194
195 pub fn is_enabled(&self) -> bool {
196 self.config.enabled
197 }
198
199 async fn add_webhook(&self, queue_name: &str, job_data: JobData) -> Result<()> {
200 if !self.is_enabled() {
201 return Ok(());
202 }
203 if self.config.batching.enabled {
204 let mut batched = self.batched_webhooks.lock().await;
205 batched
206 .entry(queue_name.to_string())
207 .or_default()
208 .push(job_data);
209 } else if let Some(qm) = &self.queue_manager {
210 qm.add_to_queue(queue_name, job_data).await?;
211 } else {
212 return Err(Error::Internal(
213 "Queue manager not initialized for webhooks".to_string(),
214 ));
215 }
216 Ok(())
217 }
218
219 fn merge_jobs_for_queue(jobs: Vec<JobData>, batch_size: usize) -> Vec<JobData> {
220 let mut merged = Vec::new();
221 let mut current: Option<JobData> = None;
222 let batch_size = batch_size.max(1);
223
224 for job in jobs {
225 for chunk in Self::split_job_by_size(job, batch_size) {
226 match current.as_mut() {
227 Some(existing)
228 if existing.app_id == chunk.app_id
229 && existing.app_key == chunk.app_key
230 && existing.app_secret == chunk.app_secret
231 && existing.payload.events.len() + chunk.payload.events.len()
232 <= batch_size =>
233 {
234 existing.payload.time_ms =
235 existing.payload.time_ms.min(chunk.payload.time_ms);
236 existing.payload.events.extend(chunk.payload.events);
237 }
238 Some(_) => {
239 if let Some(finished) = current.take() {
240 merged.push(finished);
241 }
242 current = Some(chunk);
243 }
244 None => current = Some(chunk),
245 }
246 }
247 }
248
249 if let Some(finished) = current {
250 merged.push(finished);
251 }
252
253 merged
254 }
255
256 fn split_job_by_size(job: JobData, batch_size: usize) -> Vec<JobData> {
257 let batch_size = batch_size.max(1);
258 let JobData {
259 app_key,
260 app_id,
261 app_secret,
262 payload,
263 original_signature,
264 } = job;
265
266 let JobPayload { time_ms, events } = payload;
267 let mut chunks = Vec::new();
268
269 for event_chunk in events.chunks(batch_size) {
270 chunks.push(JobData {
271 app_key: app_key.clone(),
272 app_id: app_id.clone(),
273 app_secret: app_secret.clone(),
274 payload: JobPayload {
275 time_ms,
276 events: event_chunk.to_vec(),
277 },
278 original_signature: original_signature.clone(),
279 });
280 }
281
282 if chunks.is_empty() {
283 chunks.push(JobData {
284 app_key,
285 app_id,
286 app_secret,
287 payload: JobPayload {
288 time_ms,
289 events: Vec::new(),
290 },
291 original_signature,
292 });
293 }
294
295 chunks
296 }
297
298 fn create_job_data(
299 &self,
300 app: &App,
301 events_payload: Vec<Value>,
302 original_signature_for_queue: &str,
303 ) -> JobData {
304 let job_payload = JobPayload {
305 time_ms: chrono::Utc::now().timestamp_millis(),
306 events: events_payload,
307 };
308 JobData {
309 app_key: app.key.clone(),
310 app_id: app.id.clone(),
311 app_secret: app.secret.clone(),
312 payload: job_payload,
313 original_signature: original_signature_for_queue.to_string(),
314 }
315 }
316
317 async fn should_send_webhook(&self, app: &App, event_type_name: &str) -> bool {
318 if !self.is_enabled() {
319 return false;
320 }
321 app.webhooks.as_ref().is_some_and(|webhooks| {
322 webhooks
323 .iter()
324 .any(|wh_config| wh_config.event_types.contains(&event_type_name.to_string()))
325 })
326 }
327
328 pub async fn send_channel_occupied(&self, app: &App, channel: &str) -> Result<()> {
329 if !self.should_send_webhook(app, "channel_occupied").await {
330 return Ok(());
331 }
332 let event_obj = json!({
333 "name": "channel_occupied",
334 "channel": channel
335 });
336 let signature = format!("{}:{}:channel_occupied", app.id, channel);
337 let job_data = self.create_job_data(app, vec![event_obj], &signature);
338
339 self.add_webhook("webhooks", job_data).await
340 }
341
342 pub async fn send_channel_vacated(&self, app: &App, channel: &str) -> Result<()> {
343 if !self.should_send_webhook(app, "channel_vacated").await {
344 return Ok(());
345 }
346 let event_obj = json!({
347 "name": "channel_vacated",
348 "channel": channel
349 });
350 let signature = format!("{}:{}:channel_vacated", app.id, channel);
351 let job_data = self.create_job_data(app, vec![event_obj], &signature);
352 self.add_webhook("webhooks", job_data).await
353 }
354
355 pub async fn send_member_added(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
356 if !self.should_send_webhook(app, "member_added").await {
357 return Ok(());
358 }
359 let event_obj = json!({
360 "name": "member_added",
361 "channel": channel,
362 "user_id": user_id
363 });
364 let signature = format!("{}:{}:{}:member_added", app.id, channel, user_id);
365 let job_data = self.create_job_data(app, vec![event_obj], &signature);
366 self.add_webhook("webhooks", job_data).await
367 }
368
369 pub async fn send_member_removed(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
370 if !self.should_send_webhook(app, "member_removed").await {
371 return Ok(());
372 }
373 let event_obj = json!({
374 "name": "member_removed",
375 "channel": channel,
376 "user_id": user_id
377 });
378 let signature = format!("{}:{}:{}:member_removed", app.id, channel, user_id);
379 let job_data = self.create_job_data(app, vec![event_obj], &signature);
380 self.add_webhook("webhooks", job_data).await
381 }
382
383 pub async fn send_client_event(
384 &self,
385 app: &App,
386 channel: &str,
387 event_name: &str,
388 event_data: Value,
389 socket_id: Option<&str>,
390 user_id: Option<&str>,
391 ) -> Result<()> {
392 if !self.should_send_webhook(app, "client_event").await {
393 return Ok(());
394 }
395
396 let mut client_event_pusher_payload = json!({
397 "name": "client_event",
398 "channel": channel,
399 "event": event_name,
400 "data": event_data,
401 "socket_id": socket_id,
402 });
403
404 if channel.starts_with("presence-")
405 && let Some(uid) = user_id
406 {
407 client_event_pusher_payload["user_id"] = json!(uid);
408 }
409
410 let signature = format!(
411 "{}:{}:{}:client_event",
412 app.id,
413 channel,
414 socket_id.unwrap_or("unknown")
415 );
416 let job_data = self.create_job_data(app, vec![client_event_pusher_payload], &signature);
417 self.add_webhook("webhooks", job_data).await
418 }
419
420 pub async fn send_cache_missed(&self, app: &App, channel: &str) -> Result<()> {
421 if !self.should_send_webhook(app, "cache_miss").await {
422 return Ok(());
423 }
424 let event_obj = json!({
425 "name": "cache_miss",
426 "channel": channel,
427 "data" : "{}"
428 });
429 let signature = format!("{}:{}:cache_miss", app.id, channel);
430 let job_data = self.create_job_data(app, vec![event_obj], &signature);
431 self.add_webhook("webhooks", job_data).await
432 }
433
434 pub async fn send_subscription_count_changed(
436 &self,
437 app: &App,
438 channel: &str,
439 subscription_count: usize,
440 ) -> Result<()> {
441 if !self.should_send_webhook(app, "subscription_count").await {
442 return Ok(());
443 }
444
445 let event_obj = json!({
446 "name": "subscription_count",
447 "channel": channel,
448 "subscription_count": subscription_count
449 });
450
451 let signature = format!(
452 "{}:{}:subscription_count:{}",
453 app.id, channel, subscription_count
454 );
455
456 let job_data = self.create_job_data(app, vec![event_obj], &signature);
457 self.add_webhook("webhooks", job_data).await
458 }
459
460 pub async fn check_queue_health(&self) -> Result<()> {
462 if let Some(qm) = &self.queue_manager {
463 qm.check_health().await
464 } else {
465 Ok(())
466 }
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473 use sockudo_app::memory_app_manager::MemoryAppManager;
474 use sockudo_core::webhook_types::{JobData, JobPayload};
475 use sockudo_queue::manager::QueueManagerFactory;
476
477 async fn create_test_queue_manager() -> Arc<QueueManager> {
478 let driver = QueueManagerFactory::create("memory", None, None, None)
479 .await
480 .expect("Failed to create test queue manager");
481 Arc::new(QueueManager::new(driver))
482 }
483
484 #[tokio::test]
485 async fn test_send_cache_missed() {
486 let app = App {
487 id: "test_app".to_string(),
488 key: "test_key".to_string(),
489 secret: "test_secret".to_string(),
490 max_connections: 100,
491 enable_client_messages: true,
492 enabled: true,
493 max_client_events_per_second: 100,
494 ..Default::default()
495 };
496 let app_manager = Arc::new(MemoryAppManager::new());
497 let config = WebhookConfig {
498 ..Default::default()
499 };
500 let queue_manager = create_test_queue_manager().await;
501 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
502 .await
503 .unwrap();
504
505 let result = integration.send_cache_missed(&app, "test_channel").await;
506 assert!(result.is_ok());
507 }
508
509 #[tokio::test]
510 async fn test_send_subscription_count_changed() {
511 let app = App {
512 id: "test_app".to_string(),
513 key: "test_key".to_string(),
514 secret: "test_secret".to_string(),
515 max_connections: 100,
516 enable_client_messages: true,
517 enabled: true,
518 max_client_events_per_second: 100,
519 ..Default::default()
520 };
521 let app_manager = Arc::new(MemoryAppManager::new());
522 let config = WebhookConfig {
523 ..Default::default()
524 };
525 let queue_manager = create_test_queue_manager().await;
526 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
527 .await
528 .unwrap();
529
530 let result = integration
531 .send_subscription_count_changed(&app, "test_channel", 5)
532 .await;
533 assert!(result.is_ok());
534
535 let config = WebhookConfig {
536 enabled: true,
537 ..Default::default()
538 };
539 let queue_manager = create_test_queue_manager().await;
540 let integration = WebhookIntegration::new(config, app_manager, Some(queue_manager))
541 .await
542 .unwrap();
543
544 let result = integration
545 .send_subscription_count_changed(&app, "test_channel", 5)
546 .await;
547 assert!(result.is_ok());
548 }
549
550 #[test]
551 fn test_webhook_config_serialization() {
552 let config = WebhookConfig {
553 enabled: true,
554 batching: BatchingConfig {
555 enabled: true,
556 duration: 1000,
557 size: 50,
558 },
559 process_id: "test-process".to_string(),
560 debug: false,
561 };
562
563 let serialized = sonic_rs::to_string(&config).unwrap();
564 let deserialized: WebhookConfig = sonic_rs::from_str(&serialized).unwrap();
565
566 assert_eq!(config.enabled, deserialized.enabled);
567 assert_eq!(config.batching.enabled, deserialized.batching.enabled);
568 assert_eq!(config.batching.duration, deserialized.batching.duration);
569 assert_eq!(config.batching.size, deserialized.batching.size);
570 }
571
572 #[tokio::test]
573 async fn test_webhook_integration_new() {
574 let app_manager = Arc::new(MemoryAppManager::new());
575 let config = WebhookConfig {
576 ..Default::default()
577 };
578
579 let queue_manager = create_test_queue_manager().await;
580 let integration = WebhookIntegration::new(config, app_manager, Some(queue_manager)).await;
581 assert!(integration.is_ok());
582 }
583
584 #[tokio::test]
585 async fn test_webhook_integration_send_event() {
586 let app = App {
587 id: "test_app".to_string(),
588 key: "test_key".to_string(),
589 secret: "test_secret".to_string(),
590 max_connections: 100,
591 enable_client_messages: true,
592 enabled: true,
593 max_client_events_per_second: 100,
594 ..Default::default()
595 };
596 let app_manager = Arc::new(MemoryAppManager::new());
597 let config = WebhookConfig {
598 ..Default::default()
599 };
600 let queue_manager = create_test_queue_manager().await;
601 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
602 .await
603 .unwrap();
604
605 let result = integration
606 .send_client_event(
607 &app,
608 "test_channel",
609 "test_event",
610 json!("test_data"),
611 None,
612 None,
613 )
614 .await;
615 assert!(result.is_ok());
616 }
617
618 #[tokio::test]
619 async fn test_webhook_integration_send_client_event() {
620 let app = App {
621 id: "test_app".to_string(),
622 key: "test_key".to_string(),
623 secret: "test_secret".to_string(),
624 max_connections: 100,
625 enable_client_messages: true,
626 enabled: true,
627 max_client_events_per_second: 100,
628 ..Default::default()
629 };
630 let app_manager = Arc::new(MemoryAppManager::new());
631 let config = WebhookConfig {
632 ..Default::default()
633 };
634 let queue_manager = create_test_queue_manager().await;
635 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
636 .await
637 .unwrap();
638
639 let result = integration
640 .send_client_event(
641 &app,
642 "test_channel",
643 "test_event",
644 json!("test_data"),
645 None,
646 None,
647 )
648 .await;
649 assert!(result.is_ok());
650 }
651
652 #[tokio::test]
653 async fn test_webhook_integration_send_member_added() {
654 let app = App {
655 id: "test_app".to_string(),
656 key: "test_key".to_string(),
657 secret: "test_secret".to_string(),
658 max_connections: 100,
659 enable_client_messages: true,
660 enabled: true,
661 max_client_events_per_second: 100,
662 ..Default::default()
663 };
664 let app_manager = Arc::new(MemoryAppManager::new());
665 let config = WebhookConfig {
666 ..Default::default()
667 };
668 let queue_manager = create_test_queue_manager().await;
669 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
670 .await
671 .unwrap();
672
673 let result = integration
674 .send_member_added(&app, "test_channel", "test_user")
675 .await;
676 assert!(result.is_ok());
677 }
678
679 #[tokio::test]
680 async fn test_webhook_integration_send_member_removed() {
681 let app = App {
682 id: "test_app".to_string(),
683 key: "test_key".to_string(),
684 secret: "test_secret".to_string(),
685 max_connections: 100,
686 enable_client_messages: true,
687 enabled: true,
688 max_client_events_per_second: 100,
689 ..Default::default()
690 };
691 let app_manager = Arc::new(MemoryAppManager::new());
692 let config = WebhookConfig {
693 ..Default::default()
694 };
695 let queue_manager = create_test_queue_manager().await;
696 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
697 .await
698 .unwrap();
699
700 let result = integration
701 .send_member_removed(&app, "test_channel", "test_user")
702 .await;
703 assert!(result.is_ok());
704 }
705
706 #[tokio::test]
707 async fn test_webhook_integration_send_channel_occupied() {
708 let app = App {
709 id: "test_app".to_string(),
710 key: "test_key".to_string(),
711 secret: "test_secret".to_string(),
712 max_connections: 100,
713 enable_client_messages: true,
714 enabled: true,
715 max_client_events_per_second: 100,
716 ..Default::default()
717 };
718 let app_manager = Arc::new(MemoryAppManager::new());
719 let config = WebhookConfig {
720 ..Default::default()
721 };
722 let queue_manager = create_test_queue_manager().await;
723 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
724 .await
725 .unwrap();
726
727 let result = integration
728 .send_channel_occupied(&app, "test_channel")
729 .await;
730 assert!(result.is_ok());
731 }
732
733 #[tokio::test]
734 async fn test_webhook_integration_send_channel_vacated() {
735 let app = App {
736 id: "test_app".to_string(),
737 key: "test_key".to_string(),
738 secret: "test_secret".to_string(),
739 max_connections: 100,
740 enable_client_messages: true,
741 enabled: true,
742 max_client_events_per_second: 100,
743 ..Default::default()
744 };
745 let app_manager = Arc::new(MemoryAppManager::new());
746 let config = WebhookConfig {
747 ..Default::default()
748 };
749 let queue_manager = create_test_queue_manager().await;
750 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
751 .await
752 .unwrap();
753
754 let result = integration.send_channel_vacated(&app, "test_channel").await;
755 assert!(result.is_ok());
756 }
757
758 #[tokio::test]
759 async fn test_webhook_integration_send_subscription_count_changed() {
760 let app = App {
761 id: "test_app".to_string(),
762 key: "test_key".to_string(),
763 secret: "test_secret".to_string(),
764 max_connections: 100,
765 enable_client_messages: true,
766 enabled: true,
767 max_client_events_per_second: 100,
768 ..Default::default()
769 };
770 let app_manager = Arc::new(MemoryAppManager::new());
771 let config = WebhookConfig {
772 ..Default::default()
773 };
774 let queue_manager = create_test_queue_manager().await;
775 let integration = WebhookIntegration::new(config, app_manager.clone(), Some(queue_manager))
776 .await
777 .unwrap();
778
779 let result = integration
780 .send_subscription_count_changed(&app, "test_channel", 5)
781 .await;
782 assert!(result.is_ok());
783 }
784
785 #[test]
786 fn test_merge_jobs_for_queue_batches_by_app_and_size() {
787 let jobs = vec![
788 JobData {
789 app_key: "key-a".to_string(),
790 app_id: "app-a".to_string(),
791 app_secret: "secret-a".to_string(),
792 payload: JobPayload {
793 time_ms: 10,
794 events: vec![json!({"name": "channel_occupied", "channel": "one"})],
795 },
796 original_signature: "sig-1".to_string(),
797 },
798 JobData {
799 app_key: "key-a".to_string(),
800 app_id: "app-a".to_string(),
801 app_secret: "secret-a".to_string(),
802 payload: JobPayload {
803 time_ms: 20,
804 events: vec![json!({"name": "channel_vacated", "channel": "two"})],
805 },
806 original_signature: "sig-2".to_string(),
807 },
808 JobData {
809 app_key: "key-b".to_string(),
810 app_id: "app-b".to_string(),
811 app_secret: "secret-b".to_string(),
812 payload: JobPayload {
813 time_ms: 30,
814 events: vec![json!({"name": "channel_occupied", "channel": "three"})],
815 },
816 original_signature: "sig-3".to_string(),
817 },
818 ];
819
820 let merged = WebhookIntegration::merge_jobs_for_queue(jobs, 2);
821
822 assert_eq!(merged.len(), 2);
823 assert_eq!(merged[0].app_id, "app-a");
824 assert_eq!(merged[0].payload.events.len(), 2);
825 assert_eq!(merged[1].app_id, "app-b");
826 assert_eq!(merged[1].payload.events.len(), 1);
827 }
828
829 #[test]
830 fn test_merge_jobs_for_queue_splits_oversized_jobs() {
831 let job = JobData {
832 app_key: "key-a".to_string(),
833 app_id: "app-a".to_string(),
834 app_secret: "secret-a".to_string(),
835 payload: JobPayload {
836 time_ms: 10,
837 events: vec![
838 json!({"name": "channel_occupied", "channel": "one"}),
839 json!({"name": "channel_occupied", "channel": "two"}),
840 json!({"name": "channel_occupied", "channel": "three"}),
841 ],
842 },
843 original_signature: "sig-1".to_string(),
844 };
845
846 let merged = WebhookIntegration::merge_jobs_for_queue(vec![job], 2);
847
848 assert_eq!(merged.len(), 2);
849 assert_eq!(merged[0].payload.events.len(), 2);
850 assert_eq!(merged[1].payload.events.len(), 1);
851 }
852}