hyperlane_plugin_websocket/websocket/impl.rs
1use crate::*;
2
3/// Allows `String` to be used as a broadcast identifier.
4impl BroadcastTypeTrait for String {}
5/// Allows string slices to be used as broadcast identifiers.
6impl BroadcastTypeTrait for &str {}
7/// Allows `char` to be used as a broadcast identifier.
8impl BroadcastTypeTrait for char {}
9/// Allows `bool` to be used as a broadcast identifier.
10impl BroadcastTypeTrait for bool {}
11/// Allows `i8` to be used as a broadcast identifier.
12impl BroadcastTypeTrait for i8 {}
13/// Allows `i16` to be used as a broadcast identifier.
14impl BroadcastTypeTrait for i16 {}
15/// Allows `i32` to be used as a broadcast identifier.
16impl BroadcastTypeTrait for i32 {}
17/// Allows `i64` to be used as a broadcast identifier.
18impl BroadcastTypeTrait for i64 {}
19/// Allows `i128` to be used as a broadcast identifier.
20impl BroadcastTypeTrait for i128 {}
21/// Allows `isize` to be used as a broadcast identifier.
22impl BroadcastTypeTrait for isize {}
23/// Allows `u8` to be used as a broadcast identifier.
24impl BroadcastTypeTrait for u8 {}
25/// Allows `u16` to be used as a broadcast identifier.
26impl BroadcastTypeTrait for u16 {}
27/// Allows `u32` to be used as a broadcast identifier.
28impl BroadcastTypeTrait for u32 {}
29/// Allows `u64` to be used as a broadcast identifier.
30impl BroadcastTypeTrait for u64 {}
31/// Allows `u128` to be used as a broadcast identifier.
32impl BroadcastTypeTrait for u128 {}
33/// Allows `usize` to be used as a broadcast identifier.
34impl BroadcastTypeTrait for usize {}
35/// Allows `f32` to be used as a broadcast identifier.
36impl BroadcastTypeTrait for f32 {}
37/// Allows `f64` to be used as a broadcast identifier.
38impl BroadcastTypeTrait for f64 {}
39/// Allows `IpAddr` to be used as a broadcast identifier.
40impl BroadcastTypeTrait for IpAddr {}
41/// Allows `Ipv4Addr` to be used as a broadcast identifier.
42impl BroadcastTypeTrait for Ipv4Addr {}
43/// Allows `Ipv6Addr` to be used as a broadcast identifier.
44impl BroadcastTypeTrait for Ipv6Addr {}
45/// Allows `SocketAddr` to be used as a broadcast identifier.
46impl BroadcastTypeTrait for SocketAddr {}
47/// Allows `NonZeroU8` to be used as a broadcast identifier.
48impl BroadcastTypeTrait for NonZeroU8 {}
49/// Allows `NonZeroU16` to be used as a broadcast identifier.
50impl BroadcastTypeTrait for NonZeroU16 {}
51/// Allows `NonZeroU32` to be used as a broadcast identifier.
52impl BroadcastTypeTrait for NonZeroU32 {}
53/// Allows `NonZeroU64` to be used as a broadcast identifier.
54impl BroadcastTypeTrait for NonZeroU64 {}
55/// Allows `NonZeroU128` to be used as a broadcast identifier.
56impl BroadcastTypeTrait for NonZeroU128 {}
57/// Allows `NonZeroUsize` to be used as a broadcast identifier.
58impl BroadcastTypeTrait for NonZeroUsize {}
59/// Allows `NonZeroI8` to be used as a broadcast identifier.
60impl BroadcastTypeTrait for NonZeroI8 {}
61/// Allows `NonZeroI16` to be used as a broadcast identifier.
62impl BroadcastTypeTrait for NonZeroI16 {}
63/// Allows `NonZeroI32` to be used as a broadcast identifier.
64impl BroadcastTypeTrait for NonZeroI32 {}
65/// Allows `NonZeroI64` to be used as a broadcast identifier.
66impl BroadcastTypeTrait for NonZeroI64 {}
67/// Allows `NonZeroI128` to be used as a broadcast identifier.
68impl BroadcastTypeTrait for NonZeroI128 {}
69/// Allows `NonZeroIsize` to be used as a broadcast identifier.
70impl BroadcastTypeTrait for NonZeroIsize {}
71/// Allows `Infallible` to be used as a broadcast identifier.
72impl BroadcastTypeTrait for Infallible {}
73
74/// Allows references to `String` to be used as broadcast identifiers.
75impl BroadcastTypeTrait for &String {}
76/// Allows double references to string slices to be used as broadcast identifiers.
77impl BroadcastTypeTrait for &&str {}
78/// Allows references to `char` to be used as broadcast identifiers.
79impl BroadcastTypeTrait for &char {}
80/// Allows references to `bool` to be used as broadcast identifiers.
81impl BroadcastTypeTrait for &bool {}
82/// Allows references to `i8` to be used as broadcast identifiers.
83impl BroadcastTypeTrait for &i8 {}
84/// Allows references to `i16` to be used as broadcast identifiers.
85impl BroadcastTypeTrait for &i16 {}
86/// Allows references to `i32` to be used as broadcast identifiers.
87impl BroadcastTypeTrait for &i32 {}
88/// Allows references to `i64` to be used as broadcast identifiers.
89impl BroadcastTypeTrait for &i64 {}
90/// Allows references to `i128` to be used as broadcast identifiers.
91impl BroadcastTypeTrait for &i128 {}
92/// Allows references to `isize` to be used as broadcast identifiers.
93impl BroadcastTypeTrait for &isize {}
94/// Allows references to `u8` to be used as broadcast identifiers.
95impl BroadcastTypeTrait for &u8 {}
96/// Allows references to `u16` to be used as broadcast identifiers.
97impl BroadcastTypeTrait for &u16 {}
98/// Allows references to `u32` to be used as broadcast identifiers.
99impl BroadcastTypeTrait for &u32 {}
100/// Allows references to `u64` to be used as
101/// Implements `BroadcastTypeTrait` for `&u128`.
102///
103/// This allows references to `u128` to be used as a broadcast identifier.
104impl BroadcastTypeTrait for &u128 {}
105/// Implements `BroadcastTypeTrait` for `&usize`.
106///
107/// This allows references to `usize` to be used as a broadcast identifier.
108impl BroadcastTypeTrait for &usize {}
109/// Implements `BroadcastTypeTrait` for `&f32`.
110///
111/// This allows references to `f32` to be used as a broadcast identifier.
112impl BroadcastTypeTrait for &f32 {}
113/// Implements `BroadcastTypeTrait` for `&f64`.
114///
115/// This allows references to `f64` to be used as a broadcast identifier.
116impl BroadcastTypeTrait for &f64 {}
117/// Implements `BroadcastTypeTrait` for `&IpAddr`.
118///
119/// This allows references to `IpAddr` to be used as a broadcast identifier.
120impl BroadcastTypeTrait for &IpAddr {}
121/// Implements `BroadcastTypeTrait` for `&Ipv4Addr`.
122///
123/// This allows references to `Ipv4Addr` to be used as a broadcast identifier.
124impl BroadcastTypeTrait for &Ipv4Addr {}
125/// Implements `BroadcastTypeTrait` for `&Ipv6Addr`.
126///
127/// This allows references to `Ipv6Addr` to be used as a broadcast identifier.
128impl BroadcastTypeTrait for &Ipv6Addr {}
129/// Implements `BroadcastTypeTrait` for `&SocketAddr`.
130///
131/// This allows references to `SocketAddr` to be used as a broadcast identifier.
132impl BroadcastTypeTrait for &SocketAddr {}
133/// Implements `BroadcastTypeTrait` for `&NonZeroU8`.
134///
135/// This allows references to `NonZeroU8` to be used as a broadcast identifier.
136impl BroadcastTypeTrait for &NonZeroU8 {}
137/// Implements `BroadcastTypeTrait` for `&NonZeroU16`.
138///
139/// This allows references to `NonZeroU16` to be used as a broadcast identifier.
140impl BroadcastTypeTrait for &NonZeroU16 {}
141/// Implements `BroadcastTypeTrait` for `&NonZeroU32`.
142///
143/// This allows references to `NonZeroU32` to be used as a broadcast identifier.
144impl BroadcastTypeTrait for &NonZeroU32 {}
145/// Implements `BroadcastTypeTrait` for `&NonZeroU64`.
146///
147/// This allows references to `NonZeroU64` to be used as a broadcast identifier.
148impl BroadcastTypeTrait for &NonZeroU64 {}
149/// Implements `BroadcastTypeTrait` for `&NonZeroU128`.
150///
151/// This allows references to `NonZeroU128` to be used as a broadcast identifier.
152impl BroadcastTypeTrait for &NonZeroU128 {}
153/// Implements `BroadcastTypeTrait` for `&NonZeroUsize`.
154///
155/// This allows references to `NonZeroUsize` to be used as a broadcast identifier.
156impl BroadcastTypeTrait for &NonZeroUsize {}
157/// Implements `BroadcastTypeTrait` for `&NonZeroI8`.
158///
159/// This allows references to `NonZeroI8` to be used as a broadcast identifier.
160impl BroadcastTypeTrait for &NonZeroI8 {}
161/// Implements `BroadcastTypeTrait` for `&NonZeroI16`.
162///
163/// This allows references to `NonZeroI16` to be used as a broadcast identifier.
164impl BroadcastTypeTrait for &NonZeroI16 {}
165/// Implements `BroadcastTypeTrait` for `&NonZeroI32`.
166///
167/// This allows references to `NonZeroI32` to be used as a broadcast identifier.
168impl BroadcastTypeTrait for &NonZeroI32 {}
169/// Implements `BroadcastTypeTrait` for `&NonZeroI64`.
170///
171/// This allows references to `NonZeroI64` to be used as a broadcast identifier.
172impl BroadcastTypeTrait for &NonZeroI64 {}
173/// Implements `BroadcastTypeTrait` for `&NonZeroI128`.
174///
175/// This allows references to `NonZeroI128` to be used as a broadcast identifier.
176impl BroadcastTypeTrait for &NonZeroI128 {}
177/// Implements `BroadcastTypeTrait` for `&NonZeroIsize`.
178///
179/// This allows references to `NonZeroIsize` to be used as a broadcast identifier.
180impl BroadcastTypeTrait for &NonZeroIsize {}
181/// Implements `BroadcastTypeTrait` for `&Infallible`.
182///
183/// This allows references to `Infallible` to be used as a broadcast identifier.
184impl BroadcastTypeTrait for &Infallible {}
185
186/// Implements the `Default` trait for `BroadcastType`.
187///
188/// The default value is `BroadcastType::Unknown`.
189///
190/// # Type Parameters
191///
192/// - `B`: The type parameter for `BroadcastType`, which must implement `BroadcastTypeTrait`.
193impl<B: BroadcastTypeTrait> Default for BroadcastType<B> {
194 fn default() -> Self {
195 BroadcastType::Unknown
196 }
197}
198
199impl<B: BroadcastTypeTrait> BroadcastType<B> {
200 /// Generates a unique key string for a given broadcast type.
201 ///
202 /// For point-to-point types, the keys are sorted to ensure consistent key generation
203 /// regardless of the order of the input keys.
204 ///
205 /// # Arguments
206 ///
207 /// - `BroadcastType<B>` - The broadcast type for which to generate the key.
208 ///
209 /// # Returns
210 ///
211 /// - `String` - The unique key string for the broadcast type.
212 #[inline]
213 pub fn get_key(broadcast_type: BroadcastType<B>) -> String {
214 match broadcast_type {
215 BroadcastType::PointToPoint(key1, key2) => {
216 let (first_key, second_key) = if key1 <= key2 {
217 (key1, key2)
218 } else {
219 (key2, key1)
220 };
221 format!(
222 "{}-{}-{}",
223 POINT_TO_POINT_KEY,
224 first_key.to_string(),
225 second_key.to_string()
226 )
227 }
228 BroadcastType::PointToGroup(key) => {
229 format!("{}-{}", POINT_TO_GROUP_KEY, key.to_string())
230 }
231 BroadcastType::Unknown => String::new(),
232 }
233 }
234}
235
236/// Implements the `Default` trait for `WebSocketConfig`.
237///
238/// Provides a default configuration for WebSocket connections, including
239/// default hook types that do nothing.
240///
241/// # Type Parameters
242///
243/// - `B`: The type parameter for `WebSocketConfig`, which must implement `BroadcastTypeTrait`.
244impl<B: BroadcastTypeTrait> Default for WebSocketConfig<B> {
245 fn default() -> Self {
246 let default_hook: ServerHookHandler = Arc::new(|_ctx| Box::pin(async {}));
247 Self {
248 context: Context::default(),
249 buffer_size: DEFAULT_BUFFER_SIZE,
250 capacity: DEFAULT_BROADCAST_SENDER_CAPACITY,
251 broadcast_type: BroadcastType::default(),
252 connected_hook: default_hook.clone(),
253 request_hook: default_hook.clone(),
254 sended_hook: default_hook.clone(),
255 closed_hook: default_hook,
256 }
257 }
258}
259
260impl<B: BroadcastTypeTrait> WebSocketConfig<B> {
261 /// Creates a new WebSocket configuration with default values.
262 ///
263 /// # Returns
264 ///
265 /// - `WebSocketConfig<B>` - A new WebSocket configuration instance.
266 #[inline]
267 pub fn new() -> Self {
268 Self::default()
269 }
270}
271
272impl<B: BroadcastTypeTrait> WebSocketConfig<B> {
273 /// Sets the buffer size for the WebSocket connection.
274 ///
275 /// # Arguments
276 ///
277 /// - `usize` - The desired buffer size in bytes.
278 ///
279 /// # Returns
280 ///
281 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
282 #[inline]
283 pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
284 self.buffer_size = buffer_size;
285 self
286 }
287
288 /// Sets the capacity for the broadcast sender.
289 ///
290 /// # Arguments
291 ///
292 /// - `Capacity` - The desired capacity.
293 ///
294 /// # Returns
295 ///
296 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
297 #[inline]
298 pub fn set_capacity(mut self, capacity: Capacity) -> Self {
299 self.capacity = capacity;
300 self
301 }
302
303 /// Sets the context for the WebSocket connection.
304 ///
305 /// # Arguments
306 ///
307 /// - `Context` - The context object to associate with the WebSocket.
308 ///
309 /// # Returns
310 ///
311 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
312 #[inline]
313 pub fn set_context(mut self, context: Context) -> Self {
314 self.context = context;
315 self
316 }
317
318 /// Sets the broadcast type for the WebSocket connection.
319 ///
320 /// # Arguments
321 ///
322 /// - `BroadcastType<B>` - The broadcast type to use for this WebSocket.
323 ///
324 /// # Returns
325 ///
326 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
327 #[inline]
328 pub fn set_broadcast_type(mut self, broadcast_type: BroadcastType<B>) -> Self {
329 self.broadcast_type = broadcast_type;
330 self
331 }
332
333 /// Retrieves a reference to the context associated with this configuration.
334 ///
335 /// # Returns
336 ///
337 /// - `&Context` - A reference to the context object.
338 #[inline]
339 pub fn get_context(&self) -> &Context {
340 &self.context
341 }
342
343 /// Retrieves the buffer size configured for the WebSocket connection.
344 ///
345 /// # Returns
346 ///
347 /// - `usize` - The buffer size in bytes.
348 #[inline]
349 pub fn get_buffer_size(&self) -> usize {
350 self.buffer_size
351 }
352
353 /// Retrieves the capacity configured for the broadcast sender.
354 ///
355 /// # Returns
356 ///
357 /// - `Capacity` - The capacity.
358 #[inline]
359 pub fn get_capacity(&self) -> Capacity {
360 self.capacity
361 }
362
363 /// Retrieves a reference to the broadcast type configured for this WebSocket.
364 ///
365 /// # Returns
366 ///
367 /// - `&BroadcastType<B>` - A reference to the broadcast type object.
368 #[inline]
369 pub fn get_broadcast_type(&self) -> &BroadcastType<B> {
370 &self.broadcast_type
371 }
372
373 /// Sets the connected hook handler.
374 ///
375 /// This hook is executed when the WebSocket connection is established.
376 ///
377 /// # Type Parameters
378 ///
379 /// - `S`: The hook type, which must implement `ServerHook`.
380 ///
381 /// # Returns
382 ///
383 /// The modified `WebSocketConfig` instance.
384 ///
385 /// # Examples
386 ///
387 /// ```rust,ignore
388 /// struct MyConnectedHook;
389 /// impl ServerHook for MyConnectedHook {
390 /// async fn new(_ctx: &Context) -> Self { Self }
391 /// async fn handle(self, ctx: &Context) { /* ... */ }
392 /// }
393 ///
394 /// let config = WebSocketConfig::new()
395 /// .set_connected_hook::<MyConnectedHook>();
396 /// ```
397 #[inline]
398 pub fn set_connected_hook<S>(mut self) -> Self
399 where
400 S: ServerHook,
401 {
402 self.connected_hook = Arc::new(|ctx| {
403 let ctx: Context = ctx.clone();
404 Box::pin(async move {
405 S::new(&ctx).await.handle(&ctx).await;
406 })
407 });
408 self
409 }
410
411 /// Sets the request hook handler.
412 ///
413 /// This hook is executed when a new request is received on the WebSocket.
414 ///
415 /// # Type Parameters
416 ///
417 /// - `S`: The hook type, which must implement `ServerHook`.
418 ///
419 /// # Returns
420 ///
421 /// The modified `WebSocketConfig` instance.
422 ///
423 /// # Examples
424 ///
425 /// ```rust,ignore
426 /// struct MyRequestHook;
427 /// impl ServerHook for MyRequestHook {
428 /// async fn new(_ctx: &Context) -> Self { Self }
429 /// async fn handle(self, ctx: &Context) { /* ... */ }
430 /// }
431 ///
432 /// let config = WebSocketConfig::new()
433 /// .set_request_hook::<MyRequestHook>();
434 /// ```
435 #[inline]
436 pub fn set_request_hook<S>(mut self) -> Self
437 where
438 S: ServerHook,
439 {
440 self.request_hook = Arc::new(|ctx| {
441 let ctx: Context = ctx.clone();
442 Box::pin(async move {
443 S::new(&ctx).await.handle(&ctx).await;
444 })
445 });
446 self
447 }
448
449 /// Sets the sended hook handler.
450 ///
451 /// This hook is executed after a message has been successfully sent over the WebSocket.
452 ///
453 /// # Type Parameters
454 ///
455 /// - `S`: The hook type, which must implement `ServerHook`.
456 ///
457 /// # Returns
458 ///
459 /// The modified `WebSocketConfig` instance.
460 ///
461 /// # Examples
462 ///
463 /// ```rust,ignore
464 /// struct MySendedHook;
465 /// impl ServerHook for MySendedHook {
466 /// async fn new(_ctx: &Context) -> Self { Self }
467 /// async fn handle(self, ctx: &Context) { /* ... */ }
468 /// }
469 ///
470 /// let config = WebSocketConfig::new()
471 /// .set_sended_hook::<MySendedHook>();
472 /// ```
473 #[inline]
474 pub fn set_sended_hook<S>(mut self) -> Self
475 where
476 S: ServerHook,
477 {
478 self.sended_hook = Arc::new(|ctx| {
479 let ctx: Context = ctx.clone();
480 Box::pin(async move {
481 S::new(&ctx).await.handle(&ctx).await;
482 })
483 });
484 self
485 }
486
487 /// Sets the closed hook handler.
488 ///
489 /// This hook is executed when the WebSocket connection is closed.
490 ///
491 /// # Type Parameters
492 ///
493 /// - `S`: The hook type, which must implement `ServerHook`.
494 ///
495 /// # Returns
496 ///
497 /// The modified `WebSocketConfig` instance.
498 ///
499 /// # Examples
500 ///
501 /// ```rust,ignore
502 /// struct MyClosedHook;
503 /// impl ServerHook for MyClosedHook {
504 /// async fn new(_ctx: &Context) -> Self { Self }
505 /// async fn handle(self, ctx: &Context) { /* ... */ }
506 /// }
507 ///
508 /// let config = WebSocketConfig::new()
509 /// .set_closed_hook::<MyClosedHook>();
510 /// ```
511 #[inline]
512 pub fn set_closed_hook<S>(mut self) -> Self
513 where
514 S: ServerHook,
515 {
516 self.closed_hook = Arc::new(|ctx| {
517 let ctx: Context = ctx.clone();
518 Box::pin(async move {
519 S::new(&ctx).await.handle(&ctx).await;
520 })
521 });
522 self
523 }
524
525 /// Retrieves a reference to the connected hook handler.
526 ///
527 /// # Returns
528 ///
529 /// - `&ServerHookHandler` - A reference to the connected hook handler.
530 #[inline]
531 pub fn get_connected_hook(&self) -> &ServerHookHandler {
532 &self.connected_hook
533 }
534
535 /// Retrieves a reference to the request hook handler.
536 ///
537 /// # Returns
538 ///
539 /// - `&ServerHookHandler` - A reference to the request hook handler.
540 #[inline]
541 pub fn get_request_hook(&self) -> &ServerHookHandler {
542 &self.request_hook
543 }
544
545 /// Retrieves a reference to the sended hook handler.
546 ///
547 /// # Returns
548 ///
549 /// - `&ServerHookHandler` - A reference to the sended hook handler.
550 #[inline]
551 pub fn get_sended_hook(&self) -> &ServerHookHandler {
552 &self.sended_hook
553 }
554
555 /// Retrieves a reference to the closed hook handler.
556 ///
557 /// # Returns
558 ///
559 /// - `&ServerHookHandler` - A reference to the closed hook handler.
560 #[inline]
561 pub fn get_closed_hook(&self) -> &ServerHookHandler {
562 &self.closed_hook
563 }
564}
565
566impl WebSocket {
567 /// Creates a new WebSocket instance.
568 ///
569 /// Initializes with a default broadcast map.
570 ///
571 /// # Returns
572 ///
573 /// - `WebSocket` - A new WebSocket instance.
574 #[inline]
575 pub fn new() -> Self {
576 Self::default()
577 }
578
579 /// Subscribes to a broadcast type or inserts a new one if it doesn't exist.
580 ///
581 /// # Type Parameters
582 ///
583 /// - `B`: The type implementing `BroadcastTypeTrait`.
584 ///
585 /// # Arguments
586 ///
587 /// - `BroadcastType<B>` - The broadcast type to subscribe to.
588 /// - `Capacity` - The capacity for the broadcast sender if a new one is inserted.
589 ///
590 /// # Returns
591 ///
592 /// - `BroadcastMapReceiver<Vec<u8>>` - A broadcast map receiver for the specified broadcast type.
593 #[inline]
594 fn subscribe_unwrap_or_insert<B: BroadcastTypeTrait>(
595 &self,
596 broadcast_type: BroadcastType<B>,
597 capacity: Capacity,
598 ) -> BroadcastMapReceiver<Vec<u8>> {
599 let key: String = BroadcastType::get_key(broadcast_type);
600 self.broadcast_map.subscribe_or_insert(&key, capacity)
601 }
602
603 /// Subscribes to a point-to-point broadcast.
604 ///
605 /// # Type Parameters
606 ///
607 /// - `B`: The type implementing `BroadcastTypeTrait`.
608 ///
609 /// # Arguments
610 ///
611 /// - `&B` - The first identifier for the point-to-point communication.
612 /// - `&B` - The second identifier for the point-to-point communication.
613 /// - `Capacity` - The capacity for the broadcast sender.
614 ///
615 /// # Returns
616 ///
617 /// - `BroadcastMapReceiver<Vec<u8>>` - A broadcast map receiver for the point-to-point broadcast.
618 #[inline]
619 fn point_to_point<B: BroadcastTypeTrait>(
620 &self,
621 key1: &B,
622 key2: &B,
623 capacity: Capacity,
624 ) -> BroadcastMapReceiver<Vec<u8>> {
625 self.subscribe_unwrap_or_insert(
626 BroadcastType::PointToPoint(key1.clone(), key2.clone()),
627 capacity,
628 )
629 }
630
631 /// Subscribes to a point-to-group broadcast.
632 ///
633 /// # Type Parameters
634 ///
635 /// - `B`: The type implementing `BroadcastTypeTrait`.
636 ///
637 /// # Arguments
638 ///
639 /// - `&B` - The identifier for the group.
640 /// - `Capacity` - The capacity for the broadcast sender.
641 ///
642 /// # Returns
643 ///
644 /// - `BroadcastMapReceiver<Vec<u8>>` - A broadcast map receiver for the point-to-group broadcast.
645 #[inline]
646 fn point_to_group<B: BroadcastTypeTrait>(
647 &self,
648 key: &B,
649 capacity: Capacity,
650 ) -> BroadcastMapReceiver<Vec<u8>> {
651 self.subscribe_unwrap_or_insert(BroadcastType::PointToGroup(key.clone()), capacity)
652 }
653
654 /// Retrieves the current receiver count for a given broadcast type.
655 ///
656 /// # Type Parameters
657 ///
658 /// - `B`: The type implementing `BroadcastTypeTrait`.
659 ///
660 /// # Arguments
661 ///
662 /// - `BroadcastType<B>` - The broadcast type for which to get the receiver count.
663 ///
664 /// # Returns
665 ///
666 /// - `ReceiverCount` - The number of active receivers for the broadcast type, or 0 if not found.
667 #[inline]
668 pub fn receiver_count<B: BroadcastTypeTrait>(
669 &self,
670 broadcast_type: BroadcastType<B>,
671 ) -> ReceiverCount {
672 let key: String = BroadcastType::get_key(broadcast_type);
673 self.broadcast_map.receiver_count(&key).unwrap_or(0)
674 }
675
676 /// Calculates the receiver count before a connection is established.
677 ///
678 /// Ensures the count does not exceed the maximum allowed value minus one.
679 ///
680 /// # Type Parameters
681 ///
682 /// - `B`: The type implementing `BroadcastTypeTrait`.
683 ///
684 /// # Arguments
685 ///
686 /// - `BroadcastType<B>` - The broadcast type for which to get the receiver count.
687 ///
688 /// # Returns
689 ///
690 /// - `ReceiverCount` - The receiver count after the connection is established.
691 #[inline]
692 pub fn receiver_count_before_connected<B: BroadcastTypeTrait>(
693 &self,
694 broadcast_type: BroadcastType<B>,
695 ) -> ReceiverCount {
696 let count: ReceiverCount = self.receiver_count(broadcast_type);
697 count.clamp(0, ReceiverCount::MAX - 1) + 1
698 }
699
700 /// Calculates the receiver count after a connection is closed.
701 ///
702 /// Ensures the count does not go below 0.
703 ///
704 /// # Type Parameters
705 ///
706 /// - `B`: The type implementing `BroadcastTypeTrait`.
707 ///
708 /// # Arguments
709 ///
710 /// - `BroadcastType<B>` - The broadcast type for which to get the receiver count.
711 ///
712 /// # Returns
713 ///
714 /// - `ReceiverCount` - The receiver count after the connection is closed.
715 #[inline]
716 pub fn receiver_count_after_closed<B: BroadcastTypeTrait>(
717 &self,
718 broadcast_type: BroadcastType<B>,
719 ) -> ReceiverCount {
720 let count: ReceiverCount = self.receiver_count(broadcast_type);
721 count.clamp(1, ReceiverCount::MAX) - 1
722 }
723
724 /// Sends data to all active receivers for a given broadcast type.
725 ///
726 /// # Type Parameters
727 ///
728 /// - `T`: The type of data to send, which must be convertible to `Vec<u8>`.
729 /// - `B`: The type implementing `BroadcastTypeTrait`.
730 ///
731 /// # Arguments
732 ///
733 /// - `BroadcastType<B>` - The broadcast type to which to send the data.
734 /// - `T` - The data to send.
735 ///
736 /// # Returns
737 ///
738 /// - `BroadcastMapSendResult<Vec<u8>>` - A result indicating the success or failure of the send operation.
739 #[inline]
740 pub fn send<T, B>(
741 &self,
742 broadcast_type: BroadcastType<B>,
743 data: T,
744 ) -> BroadcastMapSendResult<Vec<u8>>
745 where
746 T: Into<Vec<u8>>,
747 B: BroadcastTypeTrait,
748 {
749 let key: String = BroadcastType::get_key(broadcast_type);
750 self.broadcast_map.send(&key, data.into())
751 }
752
753 /// Runs the WebSocket connection, handling incoming requests and outgoing messages.
754 ///
755 /// This asynchronous function continuously monitors for new WebSocket requests
756 /// and incoming broadcast messages, processing them according to the configured hooks.
757 ///
758 /// # Type Parameters
759 ///
760 /// - `B`: The type implementing `BroadcastTypeTrait`.
761 ///
762 /// # Arguments
763 ///
764 /// - `WebSocketConfig<B>` - The WebSocket configuration containing the configuration for this WebSocket instance.
765 ///
766 /// # Panics
767 ///
768 /// Panics if the context in the WebSocket configuration is not set (i.e., it's the default context).
769 /// Panics if the broadcast type in the WebSocket configuration is `BroadcastType::Unknown`.
770 pub async fn run<B: BroadcastTypeTrait>(&self, config: WebSocketConfig<B>) {
771 let ctx: Context = config.get_context().clone();
772 if ctx.to_string() == Context::default().to_string() {
773 panic!("Context must be set");
774 }
775 let buffer_size: usize = config.get_buffer_size();
776 let capacity: Capacity = config.get_capacity();
777 let broadcast_type: BroadcastType<B> = config.get_broadcast_type().clone();
778 let mut receiver: Receiver<Vec<u8>> = match &broadcast_type {
779 BroadcastType::PointToPoint(key1, key2) => self.point_to_point(key1, key2, capacity),
780 BroadcastType::PointToGroup(key) => self.point_to_group(key, capacity),
781 BroadcastType::Unknown => panic!("BroadcastType must be PointToPoint or PointToGroup"),
782 };
783 let key: String = BroadcastType::get_key(broadcast_type);
784 config.get_connected_hook()(&ctx).await;
785 let result_handle = || async {
786 ctx.aborted().await;
787 ctx.closed().await;
788 };
789 loop {
790 tokio::select! {
791 request_res = ctx.ws_from_stream(buffer_size) => {
792 let mut need_break = false;
793 if request_res.is_ok() {
794 config.get_request_hook()(&ctx).await;
795 } else {
796 need_break = true;
797 config.get_closed_hook()(&ctx).await;
798 }
799 let body: ResponseBody = ctx.get_response_body().await;
800 let is_err: bool = self.broadcast_map.send(&key, body).is_err();
801 config.get_sended_hook()(&ctx).await;
802 if need_break || is_err {
803 break;
804 }
805 },
806 msg_res = receiver.recv() => {
807 if let Ok(msg) = &msg_res {
808 let frame_list: Vec<ResponseBody> = WebSocketFrame::create_frame_list(msg);
809 if ctx.send_body_list_with_data(&frame_list).await.is_ok() {
810 continue;
811 }
812 }
813 break;
814 }
815 }
816 }
817 result_handle().await;
818 }
819}