Skip to main content

sockudo_webhook/
integration.rs

1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4use sockudo_core::queue::QueueInterface;
5use sockudo_core::webhook_types::{JobData, JobPayload, JobProcessorFnAsync};
6
7use crate::sender::WebhookSender;
8use ahash::AHashMap;
9use serde::{Deserialize, Serialize};
10use sonic_rs::{Value, json};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::time::interval;
15use tracing::{error, info, warn};
16
17/// Configuration for the webhook integration
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct WebhookConfig {
20    pub enabled: bool,
21    pub batching: BatchingConfig,
22    pub process_id: String,
23    pub debug: bool,
24}
25
26impl Default for WebhookConfig {
27    fn default() -> Self {
28        Self {
29            enabled: true,
30            batching: BatchingConfig::default(),
31            process_id: uuid::Uuid::new_v4().to_string(),
32            debug: false,
33        }
34    }
35}
36
37/// Configuration for webhook batching (Sockudo's internal batching)
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BatchingConfig {
40    pub enabled: bool,
41    pub duration: u64, // in milliseconds
42    pub size: usize,
43}
44
45impl Default for BatchingConfig {
46    fn default() -> Self {
47        Self {
48            enabled: false,
49            duration: 50,
50            size: 100,
51        }
52    }
53}
54
55/// Thin wrapper around a queue driver, mirroring the main crate's QueueManager.
56/// This avoids a circular dependency while keeping the same API surface.
57pub struct QueueManager {
58    driver: Box<dyn QueueInterface>,
59}
60
61impl QueueManager {
62    pub fn new(driver: Box<dyn QueueInterface>) -> Self {
63        Self { driver }
64    }
65
66    pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
67        self.driver.add_to_queue(queue_name, data).await
68    }
69
70    pub async fn process_queue(
71        &self,
72        queue_name: &str,
73        callback: JobProcessorFnAsync,
74    ) -> Result<()> {
75        self.driver.process_queue(queue_name, callback).await
76    }
77
78    pub async fn disconnect(&self) -> Result<()> {
79        self.driver.disconnect().await
80    }
81
82    pub async fn check_health(&self) -> Result<()> {
83        self.driver.check_health().await
84    }
85}
86
87/// Webhook integration for processing events
88pub struct WebhookIntegration {
89    config: WebhookConfig,
90    batched_webhooks: Arc<Mutex<AHashMap<String, Vec<JobData>>>>,
91    queue_manager: Option<Arc<QueueManager>>,
92    app_manager: Arc<dyn AppManager + Send + Sync>,
93}
94
95impl WebhookIntegration {
96    pub async fn new(
97        config: WebhookConfig,
98        app_manager: Arc<dyn AppManager + Send + Sync>,
99        queue_manager: Option<Arc<QueueManager>>,
100    ) -> Result<Self> {
101        let mut integration = Self {
102            config,
103            batched_webhooks: Arc::new(Mutex::new(AHashMap::new())),
104            queue_manager: None,
105            app_manager,
106        };
107
108        if integration.config.enabled {
109            if let Some(qm) = queue_manager {
110                integration.setup_webhook_processor(qm).await?;
111            } else {
112                warn!(
113                    "Webhooks are enabled but no queue manager provided, webhooks will be disabled"
114                );
115                integration.config.enabled = false;
116            }
117        }
118
119        if integration.config.enabled && integration.config.batching.enabled {
120            integration.start_batching_task();
121        }
122
123        Ok(integration)
124    }
125
126    async fn setup_webhook_processor(&mut self, queue_manager: Arc<QueueManager>) -> Result<()> {
127        let webhook_sender = Arc::new(WebhookSender::new(self.app_manager.clone()));
128        let queue_name = "webhooks".to_string();
129        let sender_clone = webhook_sender.clone();
130
131        let processor: JobProcessorFnAsync = Box::new(move |job_data| {
132            let sender_for_task = sender_clone.clone();
133            Box::pin(async move {
134                info!(
135                    "{}",
136                    format!("Processing webhook job from queue: {:?}", job_data.app_id)
137                );
138                sender_for_task.process_webhook_job(job_data).await
139            })
140        });
141
142        queue_manager.process_queue(&queue_name, processor).await?;
143        self.queue_manager = Some(queue_manager);
144        Ok(())
145    }
146
147    fn start_batching_task(&self) {
148        if !self.config.batching.enabled {
149            return;
150        }
151        let queue_manager_clone = self.queue_manager.clone();
152        let batched_webhooks_clone = self.batched_webhooks.clone();
153        let batch_duration = self.config.batching.duration;
154        let batch_size = self.config.batching.size.max(1);
155
156        tokio::spawn(async move {
157            let mut interval = interval(Duration::from_millis(batch_duration));
158            loop {
159                interval.tick().await;
160                let webhooks_to_process: AHashMap<String, Vec<JobData>> = {
161                    let mut batched = batched_webhooks_clone.lock().await;
162                    std::mem::take(&mut *batched)
163                };
164
165                if webhooks_to_process.is_empty() {
166                    continue;
167                }
168                info!(
169                    "{}",
170                    format!(
171                        "Processing {} batched webhook queues (Sockudo internal batching)",
172                        webhooks_to_process.len()
173                    )
174                );
175
176                if let Some(qm) = &queue_manager_clone {
177                    for (queue_name, jobs) in webhooks_to_process {
178                        for batch in Self::merge_jobs_for_queue(jobs, batch_size) {
179                            if let Err(e) = qm.add_to_queue(&queue_name, batch).await {
180                                error!(
181                                    "{}",
182                                    format!(
183                                        "Failed to add batched job to queue {}: {}",
184                                        queue_name, e
185                                    )
186                                );
187                            }
188                        }
189                    }
190                }
191            }
192        });
193    }
194
195    pub fn is_enabled(&self) -> bool {
196        self.config.enabled
197    }
198
199    async fn add_webhook(&self, queue_name: &str, job_data: JobData) -> Result<()> {
200        if !self.is_enabled() {
201            return Ok(());
202        }
203        if self.config.batching.enabled {
204            let mut batched = self.batched_webhooks.lock().await;
205            batched
206                .entry(queue_name.to_string())
207                .or_default()
208                .push(job_data);
209        } else if let Some(qm) = &self.queue_manager {
210            qm.add_to_queue(queue_name, job_data).await?;
211        } else {
212            return Err(Error::Internal(
213                "Queue manager not initialized for webhooks".to_string(),
214            ));
215        }
216        Ok(())
217    }
218
219    fn merge_jobs_for_queue(jobs: Vec<JobData>, batch_size: usize) -> Vec<JobData> {
220        let mut merged = Vec::new();
221        let mut current: Option<JobData> = None;
222        let batch_size = batch_size.max(1);
223
224        for job in jobs {
225            for chunk in Self::split_job_by_size(job, batch_size) {
226                match current.as_mut() {
227                    Some(existing)
228                        if existing.app_id == chunk.app_id
229                            && existing.app_key == chunk.app_key
230                            && existing.app_secret == chunk.app_secret
231                            && existing.payload.events.len() + chunk.payload.events.len()
232                                <= batch_size =>
233                    {
234                        existing.payload.time_ms =
235                            existing.payload.time_ms.min(chunk.payload.time_ms);
236                        existing.payload.events.extend(chunk.payload.events);
237                    }
238                    Some(_) => {
239                        if let Some(finished) = current.take() {
240                            merged.push(finished);
241                        }
242                        current = Some(chunk);
243                    }
244                    None => current = Some(chunk),
245                }
246            }
247        }
248
249        if let Some(finished) = current {
250            merged.push(finished);
251        }
252
253        merged
254    }
255
256    fn split_job_by_size(job: JobData, batch_size: usize) -> Vec<JobData> {
257        let batch_size = batch_size.max(1);
258        let JobData {
259            app_key,
260            app_id,
261            app_secret,
262            payload,
263            original_signature,
264        } = job;
265
266        let JobPayload { time_ms, events } = payload;
267        let mut chunks = Vec::new();
268
269        for event_chunk in events.chunks(batch_size) {
270            chunks.push(JobData {
271                app_key: app_key.clone(),
272                app_id: app_id.clone(),
273                app_secret: app_secret.clone(),
274                payload: JobPayload {
275                    time_ms,
276                    events: event_chunk.to_vec(),
277                },
278                original_signature: original_signature.clone(),
279            });
280        }
281
282        if chunks.is_empty() {
283            chunks.push(JobData {
284                app_key,
285                app_id,
286                app_secret,
287                payload: JobPayload {
288                    time_ms,
289                    events: Vec::new(),
290                },
291                original_signature,
292            });
293        }
294
295        chunks
296    }
297
298    fn create_job_data(
299        &self,
300        app: &App,
301        events_payload: Vec<Value>,
302        original_signature_for_queue: &str,
303    ) -> JobData {
304        let job_payload = JobPayload {
305            time_ms: chrono::Utc::now().timestamp_millis(),
306            events: events_payload,
307        };
308        JobData {
309            app_key: app.key.clone(),
310            app_id: app.id.clone(),
311            app_secret: app.secret.clone(),
312            payload: job_payload,
313            original_signature: original_signature_for_queue.to_string(),
314        }
315    }
316
317    async fn should_send_webhook(&self, app: &App, event_type_name: &str) -> bool {
318        if !self.is_enabled() {
319            return false;
320        }
321        app.webhooks.as_ref().is_some_and(|webhooks| {
322            webhooks
323                .iter()
324                .any(|wh_config| wh_config.event_types.contains(&event_type_name.to_string()))
325        })
326    }
327
328    pub async fn send_channel_occupied(&self, app: &App, channel: &str) -> Result<()> {
329        if !self.should_send_webhook(app, "channel_occupied").await {
330            return Ok(());
331        }
332        let event_obj = json!({
333            "name": "channel_occupied",
334            "channel": channel
335        });
336        let signature = format!("{}:{}:channel_occupied", app.id, channel);
337        let job_data = self.create_job_data(app, vec![event_obj], &signature);
338
339        self.add_webhook("webhooks", job_data).await
340    }
341
342    pub async fn send_channel_vacated(&self, app: &App, channel: &str) -> Result<()> {
343        if !self.should_send_webhook(app, "channel_vacated").await {
344            return Ok(());
345        }
346        let event_obj = json!({
347            "name": "channel_vacated",
348            "channel": channel
349        });
350        let signature = format!("{}:{}:channel_vacated", app.id, channel);
351        let job_data = self.create_job_data(app, vec![event_obj], &signature);
352        self.add_webhook("webhooks", job_data).await
353    }
354
355    pub async fn send_member_added(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
356        if !self.should_send_webhook(app, "member_added").await {
357            return Ok(());
358        }
359        let event_obj = json!({
360            "name": "member_added",
361            "channel": channel,
362            "user_id": user_id
363        });
364        let signature = format!("{}:{}:{}:member_added", app.id, channel, user_id);
365        let job_data = self.create_job_data(app, vec![event_obj], &signature);
366        self.add_webhook("webhooks", job_data).await
367    }
368
369    pub async fn send_member_removed(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
370        if !self.should_send_webhook(app, "member_removed").await {
371            return Ok(());
372        }
373        let event_obj = json!({
374            "name": "member_removed",
375            "channel": channel,
376            "user_id": user_id
377        });
378        let signature = format!("{}:{}:{}:member_removed", app.id, channel, user_id);
379        let job_data = self.create_job_data(app, vec![event_obj], &signature);
380        self.add_webhook("webhooks", job_data).await
381    }
382
383    pub async fn send_client_event(
384        &self,
385        app: &App,
386        channel: &str,
387        event_name: &str,
388        event_data: Value,
389        socket_id: Option<&str>,
390        user_id: Option<&str>,
391    ) -> Result<()> {
392        if !self.should_send_webhook(app, "client_event").await {
393            return Ok(());
394        }
395
396        let mut client_event_pusher_payload = json!({
397            "name": "client_event",
398            "channel": channel,
399            "event": event_name,
400            "data": event_data,
401            "socket_id": socket_id,
402        });
403
404        if channel.starts_with("presence-")
405            && let Some(uid) = user_id
406        {
407            client_event_pusher_payload["user_id"] = json!(uid);
408        }
409
410        let signature = format!(
411            "{}:{}:{}:client_event",
412            app.id,
413            channel,
414            socket_id.unwrap_or("unknown")
415        );
416        let job_data = self.create_job_data(app, vec![client_event_pusher_payload], &signature);
417        self.add_webhook("webhooks", job_data).await
418    }
419
420    pub async fn send_cache_missed(&self, app: &App, channel: &str) -> Result<()> {
421        if !self.should_send_webhook(app, "cache_miss").await {
422            return Ok(());
423        }
424        let event_obj = json!({
425            "name": "cache_miss",
426            "channel": channel,
427            "data" : "{}"
428        });
429        let signature = format!("{}:{}:cache_miss", app.id, channel);
430        let job_data = self.create_job_data(app, vec![event_obj], &signature);
431        self.add_webhook("webhooks", job_data).await
432    }
433
434    /// Sends a webhook when the subscription count for a channel changes.
435    pub async fn send_subscription_count_changed(
436        &self,
437        app: &App,
438        channel: &str,
439        subscription_count: usize,
440    ) -> Result<()> {
441        if !self.should_send_webhook(app, "subscription_count").await {
442            return Ok(());
443        }
444
445        let event_obj = json!({
446            "name": "subscription_count",
447            "channel": channel,
448            "subscription_count": subscription_count
449        });
450
451        let signature = format!(
452            "{}:{}:subscription_count:{}",
453            app.id, channel, subscription_count
454        );
455
456        let job_data = self.create_job_data(app, vec![event_obj], &signature);
457        self.add_webhook("webhooks", job_data).await
458    }
459
460    /// Check the health of the queue manager used by webhook integration
461    pub async fn check_queue_health(&self) -> Result<()> {
462        if let Some(qm) = &self.queue_manager {
463            qm.check_health().await
464        } else {
465            Ok(())
466        }
467    }
468}