hyperlane_plugin_websocket/websocket/impl.rs
1use crate::*;
2
3/// Allows `String` to be used as a broadcast identifier.
4impl BroadcastTypeTrait for String {}
5/// Allows string slices to be used as broadcast identifiers.
6impl BroadcastTypeTrait for &str {}
7/// Allows `char` to be used as a broadcast identifier.
8impl BroadcastTypeTrait for char {}
9/// Allows `bool` to be used as a broadcast identifier.
10impl BroadcastTypeTrait for bool {}
11/// Allows `i8` to be used as a broadcast identifier.
12impl BroadcastTypeTrait for i8 {}
13/// Allows `i16` to be used as a broadcast identifier.
14impl BroadcastTypeTrait for i16 {}
15/// Allows `i32` to be used as a broadcast identifier.
16impl BroadcastTypeTrait for i32 {}
17/// Allows `i64` to be used as a broadcast identifier.
18impl BroadcastTypeTrait for i64 {}
19/// Allows `i128` to be used as a broadcast identifier.
20impl BroadcastTypeTrait for i128 {}
21/// Allows `isize` to be used as a broadcast identifier.
22impl BroadcastTypeTrait for isize {}
23/// Allows `u8` to be used as a broadcast identifier.
24impl BroadcastTypeTrait for u8 {}
25/// Allows `u16` to be used as a broadcast identifier.
26impl BroadcastTypeTrait for u16 {}
27/// Allows `u32` to be used as a broadcast identifier.
28impl BroadcastTypeTrait for u32 {}
29/// Allows `u64` to be used as a broadcast identifier.
30impl BroadcastTypeTrait for u64 {}
31/// Allows `u128` to be used as a broadcast identifier.
32impl BroadcastTypeTrait for u128 {}
33/// Allows `usize` to be used as a broadcast identifier.
34impl BroadcastTypeTrait for usize {}
35/// Allows `f32` to be used as a broadcast identifier.
36impl BroadcastTypeTrait for f32 {}
37/// Allows `f64` to be used as a broadcast identifier.
38impl BroadcastTypeTrait for f64 {}
39/// Allows `IpAddr` to be used as a broadcast identifier.
40impl BroadcastTypeTrait for IpAddr {}
41/// Allows `Ipv4Addr` to be used as a broadcast identifier.
42impl BroadcastTypeTrait for Ipv4Addr {}
43/// Allows `Ipv6Addr` to be used as a broadcast identifier.
44impl BroadcastTypeTrait for Ipv6Addr {}
45/// Allows `SocketAddr` to be used as a broadcast identifier.
46impl BroadcastTypeTrait for SocketAddr {}
47/// Allows `NonZeroU8` to be used as a broadcast identifier.
48impl BroadcastTypeTrait for NonZeroU8 {}
49/// Allows `NonZeroU16` to be used as a broadcast identifier.
50impl BroadcastTypeTrait for NonZeroU16 {}
51/// Allows `NonZeroU32` to be used as a broadcast identifier.
52impl BroadcastTypeTrait for NonZeroU32 {}
53/// Allows `NonZeroU64` to be used as a broadcast identifier.
54impl BroadcastTypeTrait for NonZeroU64 {}
55/// Allows `NonZeroU128` to be used as a broadcast identifier.
56impl BroadcastTypeTrait for NonZeroU128 {}
57/// Allows `NonZeroUsize` to be used as a broadcast identifier.
58impl BroadcastTypeTrait for NonZeroUsize {}
59/// Allows `NonZeroI8` to be used as a broadcast identifier.
60impl BroadcastTypeTrait for NonZeroI8 {}
61/// Allows `NonZeroI16` to be used as a broadcast identifier.
62impl BroadcastTypeTrait for NonZeroI16 {}
63/// Allows `NonZeroI32` to be used as a broadcast identifier.
64impl BroadcastTypeTrait for NonZeroI32 {}
65/// Allows `NonZeroI64` to be used as a broadcast identifier.
66impl BroadcastTypeTrait for NonZeroI64 {}
67/// Allows `NonZeroI128` to be used as a broadcast identifier.
68impl BroadcastTypeTrait for NonZeroI128 {}
69/// Allows `NonZeroIsize` to be used as a broadcast identifier.
70impl BroadcastTypeTrait for NonZeroIsize {}
71/// Allows `Infallible` to be used as a broadcast identifier.
72impl BroadcastTypeTrait for Infallible {}
73
74/// Allows references to `String` to be used as broadcast identifiers.
75impl BroadcastTypeTrait for &String {}
76/// Allows double references to string slices to be used as broadcast identifiers.
77impl BroadcastTypeTrait for &&str {}
78/// Allows references to `char` to be used as broadcast identifiers.
79impl BroadcastTypeTrait for &char {}
80/// Allows references to `bool` to be used as broadcast identifiers.
81impl BroadcastTypeTrait for &bool {}
82/// Allows references to `i8` to be used as broadcast identifiers.
83impl BroadcastTypeTrait for &i8 {}
84/// Allows references to `i16` to be used as broadcast identifiers.
85impl BroadcastTypeTrait for &i16 {}
86/// Allows references to `i32` to be used as broadcast identifiers.
87impl BroadcastTypeTrait for &i32 {}
88/// Allows references to `i64` to be used as broadcast identifiers.
89impl BroadcastTypeTrait for &i64 {}
90/// Allows references to `i128` to be used as broadcast identifiers.
91impl BroadcastTypeTrait for &i128 {}
92/// Allows references to `isize` to be used as broadcast identifiers.
93impl BroadcastTypeTrait for &isize {}
94/// Allows references to `u8` to be used as broadcast identifiers.
95impl BroadcastTypeTrait for &u8 {}
96/// Allows references to `u16` to be used as broadcast identifiers.
97impl BroadcastTypeTrait for &u16 {}
98/// Allows references to `u32` to be used as broadcast identifiers.
99impl BroadcastTypeTrait for &u32 {}
100/// Allows references to `u64` to be used as
101/// Implements `BroadcastTypeTrait` for `&u128`.
102///
103/// This allows references to `u128` to be used as a broadcast identifier.
104impl BroadcastTypeTrait for &u128 {}
105/// Implements `BroadcastTypeTrait` for `&usize`.
106///
107/// This allows references to `usize` to be used as a broadcast identifier.
108impl BroadcastTypeTrait for &usize {}
109/// Implements `BroadcastTypeTrait` for `&f32`.
110///
111/// This allows references to `f32` to be used as a broadcast identifier.
112impl BroadcastTypeTrait for &f32 {}
113/// Implements `BroadcastTypeTrait` for `&f64`.
114///
115/// This allows references to `f64` to be used as a broadcast identifier.
116impl BroadcastTypeTrait for &f64 {}
117/// Implements `BroadcastTypeTrait` for `&IpAddr`.
118///
119/// This allows references to `IpAddr` to be used as a broadcast identifier.
120impl BroadcastTypeTrait for &IpAddr {}
121/// Implements `BroadcastTypeTrait` for `&Ipv4Addr`.
122///
123/// This allows references to `Ipv4Addr` to be used as a broadcast identifier.
124impl BroadcastTypeTrait for &Ipv4Addr {}
125/// Implements `BroadcastTypeTrait` for `&Ipv6Addr`.
126///
127/// This allows references to `Ipv6Addr` to be used as a broadcast identifier.
128impl BroadcastTypeTrait for &Ipv6Addr {}
129/// Implements `BroadcastTypeTrait` for `&SocketAddr`.
130///
131/// This allows references to `SocketAddr` to be used as a broadcast identifier.
132impl BroadcastTypeTrait for &SocketAddr {}
133/// Implements `BroadcastTypeTrait` for `&NonZeroU8`.
134///
135/// This allows references to `NonZeroU8` to be used as a broadcast identifier.
136impl BroadcastTypeTrait for &NonZeroU8 {}
137/// Implements `BroadcastTypeTrait` for `&NonZeroU16`.
138///
139/// This allows references to `NonZeroU16` to be used as a broadcast identifier.
140impl BroadcastTypeTrait for &NonZeroU16 {}
141/// Implements `BroadcastTypeTrait` for `&NonZeroU32`.
142///
143/// This allows references to `NonZeroU32` to be used as a broadcast identifier.
144impl BroadcastTypeTrait for &NonZeroU32 {}
145/// Implements `BroadcastTypeTrait` for `&NonZeroU64`.
146///
147/// This allows references to `NonZeroU64` to be used as a broadcast identifier.
148impl BroadcastTypeTrait for &NonZeroU64 {}
149/// Implements `BroadcastTypeTrait` for `&NonZeroU128`.
150///
151/// This allows references to `NonZeroU128` to be used as a broadcast identifier.
152impl BroadcastTypeTrait for &NonZeroU128 {}
153/// Implements `BroadcastTypeTrait` for `&NonZeroUsize`.
154///
155/// This allows references to `NonZeroUsize` to be used as a broadcast identifier.
156impl BroadcastTypeTrait for &NonZeroUsize {}
157/// Implements `BroadcastTypeTrait` for `&NonZeroI8`.
158///
159/// This allows references to `NonZeroI8` to be used as a broadcast identifier.
160impl BroadcastTypeTrait for &NonZeroI8 {}
161/// Implements `BroadcastTypeTrait` for `&NonZeroI16`.
162///
163/// This allows references to `NonZeroI16` to be used as a broadcast identifier.
164impl BroadcastTypeTrait for &NonZeroI16 {}
165/// Implements `BroadcastTypeTrait` for `&NonZeroI32`.
166///
167/// This allows references to `NonZeroI32` to be used as a broadcast identifier.
168impl BroadcastTypeTrait for &NonZeroI32 {}
169/// Implements `BroadcastTypeTrait` for `&NonZeroI64`.
170///
171/// This allows references to `NonZeroI64` to be used as a broadcast identifier.
172impl BroadcastTypeTrait for &NonZeroI64 {}
173/// Implements `BroadcastTypeTrait` for `&NonZeroI128`.
174///
175/// This allows references to `NonZeroI128` to be used as a broadcast identifier.
176impl BroadcastTypeTrait for &NonZeroI128 {}
177/// Implements `BroadcastTypeTrait` for `&NonZeroIsize`.
178///
179/// This allows references to `NonZeroIsize` to be used as a broadcast identifier.
180impl BroadcastTypeTrait for &NonZeroIsize {}
181/// Implements `BroadcastTypeTrait` for `&Infallible`.
182///
183/// This allows references to `Infallible` to be used as a broadcast identifier.
184impl BroadcastTypeTrait for &Infallible {}
185
186/// Implements the `Default` trait for `BroadcastType`.
187///
188/// The default value is `BroadcastType::Unknown`.
189///
190/// # Type Parameters
191///
192/// - `B`: The type parameter for `BroadcastType`, which must implement `BroadcastTypeTrait`.
193impl<B: BroadcastTypeTrait> Default for BroadcastType<B> {
194 fn default() -> Self {
195 BroadcastType::Unknown
196 }
197}
198
199impl<B: BroadcastTypeTrait> BroadcastType<B> {
200 /// Generates a unique key string for a given broadcast type.
201 ///
202 /// For point-to-point types, the keys are sorted to ensure consistent key generation
203 /// regardless of the order of the input keys.
204 ///
205 /// # Arguments
206 ///
207 /// - `BroadcastType<B>` - The broadcast type for which to generate the key.
208 ///
209 /// # Returns
210 ///
211 /// - `String` - The unique key string for the broadcast type.
212 pub fn get_key(broadcast_type: BroadcastType<B>) -> String {
213 match broadcast_type {
214 BroadcastType::PointToPoint(key1, key2) => {
215 let (first_key, second_key) = if key1 <= key2 {
216 (key1, key2)
217 } else {
218 (key2, key1)
219 };
220 format!(
221 "{}-{}-{}",
222 POINT_TO_POINT_KEY,
223 first_key.to_string(),
224 second_key.to_string()
225 )
226 }
227 BroadcastType::PointToGroup(key) => {
228 format!("{}-{}", POINT_TO_GROUP_KEY, key.to_string())
229 }
230 BroadcastType::Unknown => String::new(),
231 }
232 }
233}
234
235/// Implements the `Default` trait for `WebSocketConfig`.
236///
237/// Provides a default configuration for WebSocket connections, including
238/// default hook functions that do nothing.
239///
240/// # Type Parameters
241///
242/// - `B`: The type parameter for `WebSocketConfig`, which must implement `BroadcastTypeTrait`.
243impl<B: BroadcastTypeTrait> Default for WebSocketConfig<B> {
244 fn default() -> Self {
245 let default_hook: ArcFnContextPinBoxSendSync<()> = Arc::new(move |_| Box::pin(async {}));
246 Self {
247 context: Context::default(),
248 buffer_size: DEFAULT_BUFFER_SIZE,
249 capacity: DEFAULT_BROADCAST_SENDER_CAPACITY,
250 broadcast_type: BroadcastType::default(),
251 request_hook: default_hook.clone(),
252 sended_hook: default_hook.clone(),
253 closed_hook: default_hook,
254 }
255 }
256}
257
258impl<B: BroadcastTypeTrait> WebSocketConfig<B> {
259 /// Creates a new WebSocket configuration with default values.
260 ///
261 /// # Returns
262 ///
263 /// - `WebSocketConfig<B>` - A new WebSocket configuration instance.
264 pub fn new() -> Self {
265 Self::default()
266 }
267
268 /// Sets the buffer size for the WebSocket connection.
269 ///
270 /// # Arguments
271 ///
272 /// - `usize` - The desired buffer size in bytes.
273 ///
274 /// # Returns
275 ///
276 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
277 pub fn set_buffer_size(mut self, buffer_size: usize) -> Self {
278 self.buffer_size = buffer_size;
279 self
280 }
281
282 /// Sets the capacity for the broadcast sender.
283 ///
284 /// # Arguments
285 ///
286 /// - `Capacity` - The desired capacity.
287 ///
288 /// # Returns
289 ///
290 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
291 pub fn set_capacity(mut self, capacity: Capacity) -> Self {
292 self.capacity = capacity;
293 self
294 }
295
296 /// Sets the context for the WebSocket connection.
297 ///
298 /// # Arguments
299 ///
300 /// - `Context` - The context object to associate with the WebSocket.
301 ///
302 /// # Returns
303 ///
304 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
305 pub fn set_context(mut self, context: Context) -> Self {
306 self.context = context;
307 self
308 }
309
310 /// Sets the broadcast type for the WebSocket connection.
311 ///
312 /// # Arguments
313 ///
314 /// - `BroadcastType<B>` - The broadcast type to use for this WebSocket.
315 ///
316 /// # Returns
317 ///
318 /// - `WebSocketConfig<B>` - The modified WebSocket configuration instance.
319 pub fn set_broadcast_type(mut self, broadcast_type: BroadcastType<B>) -> Self {
320 self.broadcast_type = broadcast_type;
321 self
322 }
323
324 /// Sets the request hook function.
325 ///
326 /// This hook is executed when a new request is received.
327 ///
328 /// # Type Parameters
329 ///
330 /// - `F`: The type of the function, which must be `Fn(Context) -> Fut + Send + Sync + 'static`.
331 /// - `Fut`: The future returned by the function, which must be `Future<Output = ()> + Send + 'static`.
332 ///
333 /// # Arguments
334 ///
335 /// - `hook` - The function to be used as the request hook.
336 ///
337 /// # Returns
338 ///
339 /// The modified WebSocket configuration instance.
340 pub fn set_request_hook<F, Fut>(mut self, hook: F) -> Self
341 where
342 F: Fn(Context) -> Fut + Send + Sync + 'static,
343 Fut: Future<Output = ()> + Send + 'static,
344 {
345 self.request_hook = Arc::new(move |ctx| Box::pin(hook(ctx)));
346 self
347 }
348
349 /// Sets the sended hook function.
350 ///
351 /// This hook is executed after a message has been sent.
352 ///
353 /// # Type Parameters
354 ///
355 /// - `F`: The type of the function, which must be `Fn(Context) -> Fut + Send + Sync + 'static`.
356 /// - `Fut`: The future returned by the function, which must be `Future<Output = ()> + Send + 'static`.
357 ///
358 /// # Arguments
359 ///
360 /// - `hook` - The function to be used as the sended hook.
361 ///
362 /// # Returns
363 ///
364 /// The modified WebSocket configuration instance.
365 pub fn set_sended_hook<F, Fut>(mut self, hook: F) -> Self
366 where
367 F: Fn(Context) -> Fut + Send + Sync + 'static,
368 Fut: Future<Output = ()> + Send + 'static,
369 {
370 self.sended_hook = Arc::new(move |ctx| Box::pin(hook(ctx)));
371 self
372 }
373
374 /// Sets the closed hook function.
375 ///
376 /// This hook is executed when the WebSocket connection is closed.
377 ///
378 /// # Type Parameters
379 ///
380 /// - `F`: The type of the function, which must be `Fn(Context) -> Fut + Send + Sync + 'static`.
381 /// - `Fut`: The future returned by the function, which must be `Future<Output = ()> + Send + 'static`.
382 ///
383 /// # Arguments
384 ///
385 /// - `hook` - The function to be used as the closed hook.
386 ///
387 /// # Returns
388 ///
389 /// The modified WebSocket configuration instance.
390 pub fn set_closed_hook<F, Fut>(mut self, hook: F) -> Self
391 where
392 F: Fn(Context) -> Fut + Send + Sync + 'static,
393 Fut: Future<Output = ()> + Send + 'static,
394 {
395 self.closed_hook = Arc::new(move |ctx| Box::pin(hook(ctx)));
396 self
397 }
398
399 /// Retrieves a reference to the context associated with this configuration.
400 ///
401 /// # Returns
402 ///
403 /// - `&Context` - A reference to the context object.
404 pub fn get_context(&self) -> &Context {
405 &self.context
406 }
407
408 /// Retrieves the buffer size configured for the WebSocket connection.
409 ///
410 /// # Returns
411 ///
412 /// - `usize` - The buffer size in bytes.
413 pub fn get_buffer_size(&self) -> usize {
414 self.buffer_size
415 }
416
417 /// Retrieves the capacity configured for the broadcast sender.
418 ///
419 /// # Returns
420 ///
421 /// - `Capacity` - The capacity.
422 pub fn get_capacity(&self) -> Capacity {
423 self.capacity
424 }
425
426 /// Retrieves a reference to the broadcast type configured for this WebSocket.
427 ///
428 /// # Returns
429 ///
430 /// - `&BroadcastType<B>` - A reference to the broadcast type object.
431 pub fn get_broadcast_type(&self) -> &BroadcastType<B> {
432 &self.broadcast_type
433 }
434
435 /// Retrieves a reference to the request hook function.
436 ///
437 /// # Returns
438 ///
439 /// - `&ArcFnContextPinBoxSendSync<()>` - A reference to the request hook.
440 pub fn get_request_hook(&self) -> &ArcFnContextPinBoxSendSync<()> {
441 &self.request_hook
442 }
443
444 /// Retrieves a reference to the sended hook function.
445 ///
446 /// # Returns
447 ///
448 /// - `&ArcFnContextPinBoxSendSync<()>` - A reference to the sended hook.
449 pub fn get_sended_hook(&self) -> &ArcFnContextPinBoxSendSync<()> {
450 &self.sended_hook
451 }
452
453 /// Retrieves a reference to the closed hook function.
454 ///
455 /// # Returns
456 ///
457 /// - `&ArcFnContextPinBoxSendSync<()>` - A reference to the closed hook.
458 pub fn get_closed_hook(&self) -> &ArcFnContextPinBoxSendSync<()> {
459 &self.closed_hook
460 }
461}
462
463impl WebSocket {
464 /// Creates a new WebSocket instance.
465 ///
466 /// Initializes with a default broadcast map.
467 ///
468 /// # Returns
469 ///
470 /// - `WebSocket` - A new WebSocket instance.
471 pub fn new() -> Self {
472 Self {
473 broadcast_map: BroadcastMap::default(),
474 }
475 }
476
477 /// Subscribes to a broadcast type or inserts a new one if it doesn't exist.
478 ///
479 /// # Type Parameters
480 ///
481 /// - `B`: The type implementing `BroadcastTypeTrait`.
482 ///
483 /// # Arguments
484 ///
485 /// - `BroadcastType<B>` - The broadcast type to subscribe to.
486 /// - `Capacity` - The capacity for the broadcast sender if a new one is inserted.
487 ///
488 /// # Returns
489 ///
490 /// - `BroadcastMapReceiver<Vec<u8>>` - A broadcast map receiver for the specified broadcast type.
491 fn subscribe_unwrap_or_insert<B: BroadcastTypeTrait>(
492 &self,
493 broadcast_type: BroadcastType<B>,
494 capacity: Capacity,
495 ) -> BroadcastMapReceiver<Vec<u8>> {
496 let key: String = BroadcastType::get_key(broadcast_type);
497 self.broadcast_map.subscribe_or_insert(&key, capacity)
498 }
499
500 /// Subscribes to a point-to-point broadcast.
501 ///
502 /// # Type Parameters
503 ///
504 /// - `B`: The type implementing `BroadcastTypeTrait`.
505 ///
506 /// # Arguments
507 ///
508 /// - `&B` - The first identifier for the point-to-point communication.
509 /// - `&B` - The second identifier for the point-to-point communication.
510 /// - `Capacity` - The capacity for the broadcast sender.
511 ///
512 /// # Returns
513 ///
514 /// - `BroadcastMapReceiver<Vec<u8>>` - A broadcast map receiver for the point-to-point broadcast.
515 fn point_to_point<B: BroadcastTypeTrait>(
516 &self,
517 key1: &B,
518 key2: &B,
519 capacity: Capacity,
520 ) -> BroadcastMapReceiver<Vec<u8>> {
521 self.subscribe_unwrap_or_insert(
522 BroadcastType::PointToPoint(key1.clone(), key2.clone()),
523 capacity,
524 )
525 }
526
527 /// Subscribes to a point-to-group broadcast.
528 ///
529 /// # Type Parameters
530 ///
531 /// - `B`: The type implementing `BroadcastTypeTrait`.
532 ///
533 /// # Arguments
534 ///
535 /// - `&B` - The identifier for the group.
536 /// - `Capacity` - The capacity for the broadcast sender.
537 ///
538 /// # Returns
539 ///
540 /// - `BroadcastMapReceiver<Vec<u8>>` - A broadcast map receiver for the point-to-group broadcast.
541 fn point_to_group<B: BroadcastTypeTrait>(
542 &self,
543 key: &B,
544 capacity: Capacity,
545 ) -> BroadcastMapReceiver<Vec<u8>> {
546 self.subscribe_unwrap_or_insert(BroadcastType::PointToGroup(key.clone()), capacity)
547 }
548
549 /// Retrieves the current receiver count for a given broadcast type.
550 ///
551 /// # Type Parameters
552 ///
553 /// - `B`: The type implementing `BroadcastTypeTrait`.
554 ///
555 /// # Arguments
556 ///
557 /// - `BroadcastType<B>` - The broadcast type for which to get the receiver count.
558 ///
559 /// # Returns
560 ///
561 /// - `ReceiverCount` - The number of active receivers for the broadcast type, or 0 if not found.
562 pub fn receiver_count<'a, B: BroadcastTypeTrait>(
563 &self,
564 broadcast_type: BroadcastType<B>,
565 ) -> ReceiverCount {
566 let key: String = BroadcastType::get_key(broadcast_type);
567 self.broadcast_map.receiver_count(&key).unwrap_or(0)
568 }
569
570 /// Calculates the receiver count after incrementing it.
571 ///
572 /// Ensures the count does not exceed the maximum allowed value minus one.
573 ///
574 /// # Type Parameters
575 ///
576 /// - `B`: The type implementing `BroadcastTypeTrait`.
577 ///
578 /// # Arguments
579 ///
580 /// - `BroadcastType<B>` - The broadcast type for which to increment the receiver count.
581 ///
582 /// # Returns
583 ///
584 /// - `ReceiverCount` - The incremented receiver count.
585 pub fn receiver_count_after_increment<B: BroadcastTypeTrait>(
586 &self,
587 broadcast_type: BroadcastType<B>,
588 ) -> ReceiverCount {
589 let count: ReceiverCount = self.receiver_count(broadcast_type);
590 count.max(0).min(ReceiverCount::MAX - 1) + 1
591 }
592
593 /// Calculates the receiver count after decrementing it.
594 ///
595 /// Ensures the count does not go below 0.
596 ///
597 /// # Type Parameters
598 ///
599 /// - `B`: The type implementing `BroadcastTypeTrait`.
600 ///
601 /// # Arguments
602 ///
603 /// - `BroadcastType<B>` - The broadcast type for which to decrement the receiver count.
604 ///
605 /// # Returns
606 ///
607 /// - `ReceiverCount` - The decremented receiver count.
608 pub fn receiver_count_after_decrement<B: BroadcastTypeTrait>(
609 &self,
610 broadcast_type: BroadcastType<B>,
611 ) -> ReceiverCount {
612 let count: ReceiverCount = self.receiver_count(broadcast_type);
613 count.max(1).min(ReceiverCount::MAX) - 1
614 }
615
616 /// Sends data to all active receivers for a given broadcast type.
617 ///
618 /// # Type Parameters
619 ///
620 /// - `T`: The type of data to send, which must be convertible to `Vec<u8>`.
621 /// - `B`: The type implementing `BroadcastTypeTrait`.
622 ///
623 /// # Arguments
624 ///
625 /// - `BroadcastType<B>` - The broadcast type to which to send the data.
626 /// - `T` - The data to send.
627 ///
628 /// # Returns
629 ///
630 /// - `BroadcastMapSendResult<Vec<u8>>` - A result indicating the success or failure of the send operation.
631 pub fn send<T, B>(
632 &self,
633 broadcast_type: BroadcastType<B>,
634 data: T,
635 ) -> BroadcastMapSendResult<Vec<u8>>
636 where
637 T: Into<Vec<u8>>,
638 B: BroadcastTypeTrait,
639 {
640 let key: String = BroadcastType::get_key(broadcast_type);
641 self.broadcast_map.send(&key, data.into())
642 }
643
644 /// Runs the WebSocket connection, handling incoming requests and outgoing messages.
645 ///
646 /// This asynchronous function continuously monitors for new WebSocket requests
647 /// and incoming broadcast messages, processing them according to the configured hooks.
648 ///
649 /// # Type Parameters
650 ///
651 /// - `B`: The type implementing `BroadcastTypeTrait`.
652 ///
653 /// # Arguments
654 ///
655 /// - `WebSocketConfig<B>` - The WebSocket configuration containing the configuration for this WebSocket instance.
656 ///
657 /// # Panics
658 ///
659 /// Panics if the context in the WebSocket configuration is not set (i.e., it's the default context).
660 /// Panics if the broadcast type in the WebSocket configuration is `BroadcastType::Unknown`.
661 pub async fn run<B: BroadcastTypeTrait>(&self, config: WebSocketConfig<B>) {
662 let ctx: Context = config.get_context().clone();
663 if ctx.to_string() == Context::default().to_string() {
664 panic!("Context must be set");
665 }
666 let buffer_size: usize = config.get_buffer_size();
667 let capacity: Capacity = config.get_capacity();
668 let broadcast_type: BroadcastType<B> = config.get_broadcast_type().clone();
669 let mut receiver: Receiver<Vec<u8>> = match &broadcast_type {
670 BroadcastType::PointToPoint(key1, key2) => self.point_to_point(key1, key2, capacity),
671 BroadcastType::PointToGroup(key) => self.point_to_group(key, capacity),
672 BroadcastType::Unknown => panic!("BroadcastType must be PointToPoint or PointToGroup"),
673 };
674 let key: String = BroadcastType::get_key(broadcast_type);
675 let result_handle = || async {
676 ctx.aborted().await;
677 ctx.closed().await;
678 };
679 loop {
680 tokio::select! {
681 request_res = ctx.ws_from_stream(buffer_size) => {
682 let mut need_break = false;
683 if request_res.is_ok() {
684 config.get_request_hook()(ctx.clone()).await;
685 } else {
686 need_break = true;
687 config.get_closed_hook()(ctx.clone()).await;
688 }
689 let body: ResponseBody = ctx.get_response_body().await;
690 let is_err: bool = self.broadcast_map.send(&key, body).is_err();
691 config.get_sended_hook()(ctx.clone()).await;
692 if need_break || is_err {
693 break;
694 }
695 },
696 msg_res = receiver.recv() => {
697 if let Ok(msg) = msg_res {
698 if ctx.set_response_body(msg).await.send_body().await.is_ok() {
699 continue;
700 }
701 }
702 break;
703 }
704 }
705 }
706 result_handle().await;
707 }
708}