1use sockudo_core::app::App;
2use sockudo_core::app::AppManager;
3use sockudo_core::error::{Error, Result};
4use sockudo_core::queue::QueueInterface;
5use sockudo_core::webhook_types::{JobData, JobPayload, JobProcessorFnAsync};
6
7use crate::sender::WebhookSender;
8use ahash::AHashMap;
9use serde::{Deserialize, Serialize};
10use sonic_rs::{Value, json};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::Mutex;
14use tokio::time::interval;
15use tracing::{error, info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct WebhookConfig {
20 pub enabled: bool,
21 pub batching: BatchingConfig,
22 pub process_id: String,
23 pub debug: bool,
24}
25
26impl Default for WebhookConfig {
27 fn default() -> Self {
28 Self {
29 enabled: true,
30 batching: BatchingConfig::default(),
31 process_id: uuid::Uuid::new_v4().to_string(),
32 debug: false,
33 }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BatchingConfig {
40 pub enabled: bool,
41 pub duration: u64, pub size: usize,
43}
44
45impl Default for BatchingConfig {
46 fn default() -> Self {
47 Self {
48 enabled: false,
49 duration: 50,
50 size: 100,
51 }
52 }
53}
54
55pub struct QueueManager {
58 driver: Box<dyn QueueInterface>,
59}
60
61impl QueueManager {
62 pub fn new(driver: Box<dyn QueueInterface>) -> Self {
63 Self { driver }
64 }
65
66 pub async fn add_to_queue(&self, queue_name: &str, data: JobData) -> Result<()> {
67 self.driver.add_to_queue(queue_name, data).await
68 }
69
70 pub async fn process_queue(
71 &self,
72 queue_name: &str,
73 callback: JobProcessorFnAsync,
74 ) -> Result<()> {
75 self.driver.process_queue(queue_name, callback).await
76 }
77
78 pub async fn disconnect(&self) -> Result<()> {
79 self.driver.disconnect().await
80 }
81
82 pub async fn check_health(&self) -> Result<()> {
83 self.driver.check_health().await
84 }
85}
86
87pub struct WebhookIntegration {
89 config: WebhookConfig,
90 batched_webhooks: Arc<Mutex<AHashMap<String, Vec<JobData>>>>,
91 queue_manager: Option<Arc<QueueManager>>,
92 app_manager: Arc<dyn AppManager + Send + Sync>,
93}
94
95impl WebhookIntegration {
96 pub async fn new(
97 config: WebhookConfig,
98 app_manager: Arc<dyn AppManager + Send + Sync>,
99 queue_manager: Option<Arc<QueueManager>>,
100 ) -> Result<Self> {
101 let mut integration = Self {
102 config,
103 batched_webhooks: Arc::new(Mutex::new(AHashMap::new())),
104 queue_manager: None,
105 app_manager,
106 };
107
108 if integration.config.enabled {
109 if let Some(qm) = queue_manager {
110 integration.setup_webhook_processor(qm).await?;
111 } else {
112 warn!(
113 "Webhooks are enabled but no queue manager provided, webhooks will be disabled"
114 );
115 integration.config.enabled = false;
116 }
117 }
118
119 if integration.config.enabled && integration.config.batching.enabled {
120 integration.start_batching_task();
121 }
122
123 Ok(integration)
124 }
125
126 async fn setup_webhook_processor(&mut self, queue_manager: Arc<QueueManager>) -> Result<()> {
127 let webhook_sender = Arc::new(WebhookSender::new(self.app_manager.clone()));
128 let queue_name = "webhooks".to_string();
129 let sender_clone = webhook_sender.clone();
130
131 let processor: JobProcessorFnAsync = Box::new(move |job_data| {
132 let sender_for_task = sender_clone.clone();
133 Box::pin(async move {
134 info!(
135 "{}",
136 format!("Processing webhook job from queue: {:?}", job_data.app_id)
137 );
138 sender_for_task.process_webhook_job(job_data).await
139 })
140 });
141
142 queue_manager.process_queue(&queue_name, processor).await?;
143 self.queue_manager = Some(queue_manager);
144 Ok(())
145 }
146
147 fn start_batching_task(&self) {
148 if !self.config.batching.enabled {
149 return;
150 }
151 let queue_manager_clone = self.queue_manager.clone();
152 let batched_webhooks_clone = self.batched_webhooks.clone();
153 let batch_duration = self.config.batching.duration;
154 let batch_size = self.config.batching.size.max(1);
155
156 tokio::spawn(async move {
157 let mut interval = interval(Duration::from_millis(batch_duration));
158 loop {
159 interval.tick().await;
160 let webhooks_to_process: AHashMap<String, Vec<JobData>> = {
161 let mut batched = batched_webhooks_clone.lock().await;
162 std::mem::take(&mut *batched)
163 };
164
165 if webhooks_to_process.is_empty() {
166 continue;
167 }
168 info!(
169 "{}",
170 format!(
171 "Processing {} batched webhook queues (Sockudo internal batching)",
172 webhooks_to_process.len()
173 )
174 );
175
176 if let Some(qm) = &queue_manager_clone {
177 for (queue_name, jobs) in webhooks_to_process {
178 for batch in Self::merge_jobs_for_queue(jobs, batch_size) {
179 if let Err(e) = qm.add_to_queue(&queue_name, batch).await {
180 error!(
181 "{}",
182 format!(
183 "Failed to add batched job to queue {}: {}",
184 queue_name, e
185 )
186 );
187 }
188 }
189 }
190 }
191 }
192 });
193 }
194
195 pub fn is_enabled(&self) -> bool {
196 self.config.enabled
197 }
198
199 async fn add_webhook(&self, queue_name: &str, job_data: JobData) -> Result<()> {
200 if !self.is_enabled() {
201 return Ok(());
202 }
203 if self.config.batching.enabled {
204 let mut batched = self.batched_webhooks.lock().await;
205 batched
206 .entry(queue_name.to_string())
207 .or_default()
208 .push(job_data);
209 } else if let Some(qm) = &self.queue_manager {
210 qm.add_to_queue(queue_name, job_data).await?;
211 } else {
212 return Err(Error::Internal(
213 "Queue manager not initialized for webhooks".to_string(),
214 ));
215 }
216 Ok(())
217 }
218
219 fn merge_jobs_for_queue(jobs: Vec<JobData>, batch_size: usize) -> Vec<JobData> {
220 let mut merged = Vec::new();
221 let mut current: Option<JobData> = None;
222 let batch_size = batch_size.max(1);
223
224 for job in jobs {
225 for chunk in Self::split_job_by_size(job, batch_size) {
226 match current.as_mut() {
227 Some(existing)
228 if existing.app_id == chunk.app_id
229 && existing.app_key == chunk.app_key
230 && existing.app_secret == chunk.app_secret
231 && existing.payload.events.len() + chunk.payload.events.len()
232 <= batch_size =>
233 {
234 existing.payload.time_ms =
235 existing.payload.time_ms.min(chunk.payload.time_ms);
236 existing.payload.events.extend(chunk.payload.events);
237 }
238 Some(_) => {
239 if let Some(finished) = current.take() {
240 merged.push(finished);
241 }
242 current = Some(chunk);
243 }
244 None => current = Some(chunk),
245 }
246 }
247 }
248
249 if let Some(finished) = current {
250 merged.push(finished);
251 }
252
253 merged
254 }
255
256 fn split_job_by_size(job: JobData, batch_size: usize) -> Vec<JobData> {
257 let batch_size = batch_size.max(1);
258 let JobData {
259 app_key,
260 app_id,
261 app_secret,
262 payload,
263 original_signature,
264 } = job;
265
266 let JobPayload { time_ms, events } = payload;
267 let mut chunks = Vec::new();
268
269 for event_chunk in events.chunks(batch_size) {
270 chunks.push(JobData {
271 app_key: app_key.clone(),
272 app_id: app_id.clone(),
273 app_secret: app_secret.clone(),
274 payload: JobPayload {
275 time_ms,
276 events: event_chunk.to_vec(),
277 },
278 original_signature: original_signature.clone(),
279 });
280 }
281
282 if chunks.is_empty() {
283 chunks.push(JobData {
284 app_key,
285 app_id,
286 app_secret,
287 payload: JobPayload {
288 time_ms,
289 events: Vec::new(),
290 },
291 original_signature,
292 });
293 }
294
295 chunks
296 }
297
298 fn create_job_data(
299 &self,
300 app: &App,
301 events_payload: Vec<Value>,
302 original_signature_for_queue: &str,
303 ) -> JobData {
304 let job_payload = JobPayload {
305 time_ms: chrono::Utc::now().timestamp_millis(),
306 events: events_payload,
307 };
308 JobData {
309 app_key: app.key.clone(),
310 app_id: app.id.clone(),
311 app_secret: app.secret.clone(),
312 payload: job_payload,
313 original_signature: original_signature_for_queue.to_string(),
314 }
315 }
316
317 async fn should_send_webhook(&self, app: &App, event_type_name: &str) -> bool {
318 if !self.is_enabled() {
319 return false;
320 }
321 app.webhooks.as_ref().is_some_and(|webhooks| {
322 webhooks
323 .iter()
324 .any(|wh_config| wh_config.event_types.contains(&event_type_name.to_string()))
325 })
326 }
327
328 pub async fn send_channel_occupied(&self, app: &App, channel: &str) -> Result<()> {
329 if !self.should_send_webhook(app, "channel_occupied").await {
330 return Ok(());
331 }
332 let event_obj = json!({
333 "name": "channel_occupied",
334 "channel": channel
335 });
336 let signature = format!("{}:{}:channel_occupied", app.id, channel);
337 let job_data = self.create_job_data(app, vec![event_obj], &signature);
338
339 self.add_webhook("webhooks", job_data).await
340 }
341
342 pub async fn send_channel_vacated(&self, app: &App, channel: &str) -> Result<()> {
343 if !self.should_send_webhook(app, "channel_vacated").await {
344 return Ok(());
345 }
346 let event_obj = json!({
347 "name": "channel_vacated",
348 "channel": channel
349 });
350 let signature = format!("{}:{}:channel_vacated", app.id, channel);
351 let job_data = self.create_job_data(app, vec![event_obj], &signature);
352 self.add_webhook("webhooks", job_data).await
353 }
354
355 pub async fn send_member_added(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
356 if !self.should_send_webhook(app, "member_added").await {
357 return Ok(());
358 }
359 let event_obj = json!({
360 "name": "member_added",
361 "channel": channel,
362 "user_id": user_id
363 });
364 let signature = format!("{}:{}:{}:member_added", app.id, channel, user_id);
365 let job_data = self.create_job_data(app, vec![event_obj], &signature);
366 self.add_webhook("webhooks", job_data).await
367 }
368
369 pub async fn send_member_removed(&self, app: &App, channel: &str, user_id: &str) -> Result<()> {
370 if !self.should_send_webhook(app, "member_removed").await {
371 return Ok(());
372 }
373 let event_obj = json!({
374 "name": "member_removed",
375 "channel": channel,
376 "user_id": user_id
377 });
378 let signature = format!("{}:{}:{}:member_removed", app.id, channel, user_id);
379 let job_data = self.create_job_data(app, vec![event_obj], &signature);
380 self.add_webhook("webhooks", job_data).await
381 }
382
383 pub async fn send_client_event(
384 &self,
385 app: &App,
386 channel: &str,
387 event_name: &str,
388 event_data: Value,
389 socket_id: Option<&str>,
390 user_id: Option<&str>,
391 ) -> Result<()> {
392 if !self.should_send_webhook(app, "client_event").await {
393 return Ok(());
394 }
395
396 let mut client_event_pusher_payload = json!({
397 "name": "client_event",
398 "channel": channel,
399 "event": event_name,
400 "data": event_data,
401 "socket_id": socket_id,
402 });
403
404 if channel.starts_with("presence-")
405 && let Some(uid) = user_id
406 {
407 client_event_pusher_payload["user_id"] = json!(uid);
408 }
409
410 let signature = format!(
411 "{}:{}:{}:client_event",
412 app.id,
413 channel,
414 socket_id.unwrap_or("unknown")
415 );
416 let job_data = self.create_job_data(app, vec![client_event_pusher_payload], &signature);
417 self.add_webhook("webhooks", job_data).await
418 }
419
420 pub async fn send_cache_missed(&self, app: &App, channel: &str) -> Result<()> {
421 if !self.should_send_webhook(app, "cache_miss").await {
422 return Ok(());
423 }
424 let event_obj = json!({
425 "name": "cache_miss",
426 "channel": channel,
427 "data" : "{}"
428 });
429 let signature = format!("{}:{}:cache_miss", app.id, channel);
430 let job_data = self.create_job_data(app, vec![event_obj], &signature);
431 self.add_webhook("webhooks", job_data).await
432 }
433
434 pub async fn send_subscription_count_changed(
436 &self,
437 app: &App,
438 channel: &str,
439 subscription_count: usize,
440 ) -> Result<()> {
441 if !self.should_send_webhook(app, "subscription_count").await {
442 return Ok(());
443 }
444
445 let event_obj = json!({
446 "name": "subscription_count",
447 "channel": channel,
448 "subscription_count": subscription_count
449 });
450
451 let signature = format!(
452 "{}:{}:subscription_count:{}",
453 app.id, channel, subscription_count
454 );
455
456 let job_data = self.create_job_data(app, vec![event_obj], &signature);
457 self.add_webhook("webhooks", job_data).await
458 }
459
460 pub async fn check_queue_health(&self) -> Result<()> {
462 if let Some(qm) = &self.queue_manager {
463 qm.check_health().await
464 } else {
465 Ok(())
466 }
467 }
468}