axum_wsb/
lib.rs

1#[cfg(feature = "typed")]
2pub mod typed {
3    use serde::Serialize;
4    use futures_util::{Sink, Stream, SinkExt, stream::{SplitSink, SplitStream, StreamExt}};
5    use std::{fmt::Display, sync::Arc};
6    use tokio::sync::RwLock;
7    use axum_typed_websockets::{Message, WebSocket};
8    use axum_7_9::extract::ws::CloseFrame;
9
10    /// main broadcaster for typed api.
11    #[derive(Debug)]
12    pub struct Broadcaster<T, S> {
13        pub rooms: Vec<Room<T, S>>
14    }
15
16    /// room implementation.
17    #[derive(Debug)]
18    pub struct Room<T, S> {
19        pub id: String,
20        pub connections: Vec<Connection<T, S>>
21    }
22
23    /// type for each individual connection.
24    #[derive(Debug)]
25    pub struct Connection<T, S> {
26        pub id: String,
27        pub receiver: SplitSink<WebSocket<T, S>, Message<T>>
28    }
29
30    impl<T: Display, S: Display> Connection<T, S> where SplitSink<WebSocket<T, S>, Message<T>>: Sink<Message<T>> + Unpin {
31        /// create a connection:
32        pub fn create(id: String, receiver: SplitSink<WebSocket<T, S>, Message<T>>) -> Self {
33            Self {
34                id, 
35                receiver
36            }
37        }
38
39        /// send the text message.
40        pub async fn send(&mut self, message: T) where T: Clone {
41            let _ = self.receiver.send(Message::Item(message)).await;
42        }
43
44                /// send the text message if the given condition in it's closure is true.
45        pub async fn send_if<F>(&mut self, message: T, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
46            if condition(&self) { 
47                let _ = self.receiver.send(Message::Item(message)).await;
48            } 
49        }
50
51        /// sen the text message if the given condition in it's closure is true.
52        pub async fn send_if_not<F>(&mut self, message: T, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
53            if !condition(&self) { 
54                let _ = self.receiver.send(Message::Item(message)).await;
55            } 
56        }
57
58        /// send the text message.
59        pub async fn ping(&mut self, message: &Vec<u8>) where T: Clone {
60            let _ = self.receiver.send(Message::Ping(message.clone())).await;
61        }
62
63        /// send the text message if the given condition in it's closure is true.
64        pub async fn ping_if<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
65            if condition(&self) { 
66                let _ = self.receiver.send(Message::Ping(message.clone())).await;
67            } 
68        }
69
70        /// sen the text message if the given condition in it's closure is true.
71        pub async fn ping_if_not<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
72            if !condition(&self) { 
73                let _ = self.receiver.send(Message::Ping(message.clone())).await;
74            } 
75        }
76
77        /// send the text message.
78        pub async fn pong(&mut self, message: &Vec<u8>) where T: Clone {
79            let _ = self.receiver.send(Message::Pong(message.clone())).await;
80        }
81
82                /// send the text message if the given condition in it's closure is true.
83        pub async fn pong_if<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
84            if condition(&self) { 
85                let _ = self.receiver.send(Message::Pong(message.clone())).await;
86            } 
87        }
88
89        /// sen the text message if the given condition in it's closure is true.
90        pub async fn pong_if_not<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
91            if !condition(&self) { 
92                let _ = self.receiver.send(Message::Pong(message.clone())).await;
93            } 
94        }
95    }
96
97    impl<T: Display + Serialize, S: Display + Serialize> Room<T, S> where SplitSink<WebSocket<T, S>, Message<T>>: Sink<Message<T>> + Unpin {
98        /// check if a connection with given id exist and if it's not, add a connection to a room with that ip:
99        pub fn add_connection(&mut self, id: &String, receiver: SplitSink<WebSocket<T, S>, Message<T>>) {
100            let check_is_connection_exist = self.connections.iter().any(|room| room.id == *id);
101
102            match check_is_connection_exist {
103                true => (),
104                false => {
105                    let connection = Connection {
106                        id: id.clone(),
107                        receiver
108                    };
109
110                    self.connections.push(connection);
111                }
112            }
113        }
114
115        /// remove a connection from room:
116        pub fn remove_connection(&mut self, id: String) {
117            self.connections.retain(|connection| { 
118                if connection.id == id { 
119                    false 
120                } else { 
121                    true 
122                }
123            });
124        }
125
126        /// check if a connection exist and return if it's.
127        pub fn check_connection(&mut self, id: &String) -> Option<&Connection<T, S>> {
128            let connection = self.connections.iter().find(|room| room.id == *id);
129
130            match connection {
131                Some(connection) => Some(connection),
132                None => None
133            }
134        }
135
136        /// broadcast the message directly:
137        pub async fn broadcast(&mut self, message: &T) where T: Clone { 
138            for connection in &mut self.connections { 
139                let msg = Message::Item(message.clone());
140                let receiver = &mut connection.receiver; 
141                
142                let _ = receiver.send(msg).await;
143            }
144        }
145
146        /// broadcast the message if the given condition in it's closure is true.
147        pub async fn broadcast_if<F>(&mut self, message: &T, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
148            for connection in &mut self.connections { 
149                if condition(connection) { 
150                    let msg = Message::Item(message.clone()); 
151                    let receiver = &mut connection.receiver; 
152                    let _ = receiver.send(msg).await;
153                } 
154            } 
155        }
156
157        /// broadcast the message if the given condition in it's closure is false.
158        pub async fn broadcast_if_not<F>(&mut self, message: &T, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
159            for connection in &mut self.connections { 
160                if !condition(connection) { 
161                    let msg = Message::Item(message.clone()); 
162                    let receiver = &mut connection.receiver; 
163                    let _ = receiver.send(msg).await;
164                } 
165            } 
166        }
167
168        /// broadcast the message directly:
169        pub async fn ping(&mut self, message: &Vec<u8>) where T: Clone {
170            for connection in &mut self.connections { 
171                let msg = Message::Ping(message.clone());
172                let receiver = &mut connection.receiver; 
173                        
174                let _ = receiver.send(msg).await;
175            }
176        }
177
178        /// broadcast the message if the given condition in it's closure is true.
179        pub async fn ping_if<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
180            for connection in &mut self.connections { 
181                if condition(connection) { 
182                    let msg = Message::Ping(message.clone()); 
183                    let receiver = &mut connection.receiver; 
184                    let _ = receiver.send(msg).await;
185                } 
186            } 
187        }
188        
189        /// broadcast the message if the given condition in it's closure is false.
190        pub async fn ping_if_not<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
191            for connection in &mut self.connections { 
192                if !condition(connection) { 
193                    let msg = Message::Ping(message.clone()); 
194                    let receiver = &mut connection.receiver; 
195                    let _ = receiver.send(msg).await;
196                } 
197            } 
198        }
199
200        /// broadcast the pong message directly:
201        pub async fn pong(&mut self, message: &Vec<u8>) where T: Clone {
202            for connection in &mut self.connections { 
203                let msg = Message::Pong(message.clone());
204                let receiver = &mut connection.receiver; 
205                                
206                let _ = receiver.send(msg).await;
207            }
208        }
209        
210        /// broadcast the pong message if the given condition in it's closure is true.
211        pub async fn pong_if<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
212            for connection in &mut self.connections { 
213                if condition(connection) { 
214                    let msg = Message::Pong(message.clone()); 
215                    let receiver = &mut connection.receiver; 
216                    let _ = receiver.send(msg).await;
217                } 
218            } 
219        }
220                
221        /// broadcast the pong message if the given condition in it's closure is false.
222        pub async fn pong_if_not<F>(&mut self, message: &Vec<u8>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
223            for connection in &mut self.connections { 
224                if !condition(connection) { 
225                    let msg = Message::Pong(message.clone()); 
226                    let receiver = &mut connection.receiver; 
227                    let _ = receiver.send(msg).await;
228                } 
229            } 
230        }
231
232        /// it's most convenient way to close a single connection but keeping room open.
233        pub async fn close_conn(&mut self, close_frame: Option<CloseFrame<'static>>, id: &String) where T: Clone {
234            self.connections.retain_mut(|connection| {
235                if connection.id == *id {
236                    let msg = Message::Close(close_frame.clone());
237                    let receiver = &mut connection.receiver; 
238                                                    
239                    let _ = async {
240                        let _ = receiver.send(msg).await;
241                    };
242                    
243                    false
244                } else {
245                    true
246                }
247            });
248        }
249
250        /// Close all connections and remove it from it's room but not close it.
251        pub async fn close(&mut self, close_frame: Option<CloseFrame<'static>>) where T: Clone { 
252            self.connections.retain_mut(|connection| {
253                let msg = Message::Close(close_frame.clone());
254                let receiver = &mut connection.receiver; 
255                                                        
256                let _ = async {
257                    let _ = receiver.send(msg).await;
258                };
259                        
260                false
261            });
262        }
263        
264        /// close each connection and remove them from room if the given condition in it's closure is true.
265        pub async fn close_if<F>(&mut self, close_frame: Option<CloseFrame<'static>>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
266            self.connections.retain_mut(|connection| {
267                if condition(&connection) {
268                    let msg = Message::Close(close_frame.clone());
269                    let receiver = &mut connection.receiver; 
270                                                            
271                    let _ = async {
272                        let _ =  receiver.send(msg).await;
273                    };
274                            
275                    false
276                } else {
277                    true
278                }
279            });
280        }
281        
282        /// close each connection and remove them from room if the given condition in it's closure is false.
283        pub async fn close_if_not<F>(&mut self, close_frame: Option<CloseFrame<'static>>, condition: F) where F: Fn(&Connection<T, S>) -> bool, T: Clone { 
284            self.connections.retain_mut(|connection| {
285                if !condition(&connection) {
286                    let msg = Message::Close(close_frame.clone());
287                    let receiver = &mut connection.receiver; 
288                                                                    
289                    let _ = async {
290                        let _ =  receiver.send(msg).await;
291                    };
292                                    
293                    false
294                } else {
295                    true
296                }
297            });
298        }
299    }
300
301    impl<T: Display + Serialize, S: Display + Serialize> Broadcaster<T, S> {
302        /// create new broadcaster:
303        pub fn new() -> Arc<RwLock<Self>> {
304            Arc::new(RwLock::new(Self::default()))
305        }
306
307        /// get receiver and stream, similar to ".handle()" method of actix-ws.
308        pub fn configure(socket: WebSocket<T, S>) -> (SplitSink<WebSocket<T, S>, Message<T>>, SplitStream<WebSocket<T, S>>) where WebSocket<T, S>: Sink<Message<T>> + Stream + Sized {
309            socket.split()
310        }
311
312        /// handle the all thing. If you use that api, there is no need to any other configuration for grouping and identifying connections:
313        pub async fn handle(broadcaster: &Arc<RwLock<Self>>, room_id: &String, conn_id: &String, receiver: SplitSink<WebSocket<T, S>, Message<T>>) -> Arc<RwLock<Self>> {
314            let mut broadcaster_write = broadcaster.write().await;
315
316            broadcaster_write.handle_room(room_id).add_connection(conn_id, receiver);
317
318            Arc::clone(&broadcaster)
319        }
320
321        /// check if a room with given id exist and if it's not create one.
322        pub fn handle_room(&mut self, id: &String) -> &mut Room<T, S> {
323            if let Some(index) = self.rooms.iter().position(|room| room.id == *id) {
324                return &mut self.rooms[index];
325            }
326        
327            self.rooms.push(Room {
328                id: id.clone(),
329                connections: vec![],
330            });
331        
332            self.rooms.last_mut().unwrap()
333        }
334
335        /// iterates through every room and does something with them immutably. You cannot mutate anything inside of it, even rooms and not captured variables.      
336        /// 
337        /// ```rust
338        /// 
339        /// use axum_wsb::typed::Broadcaster;
340        /// use tokio::sync::RwLock;
341        /// use std::sync::Arc;
342        /// 
343        /// fn main () {
344        ///     let receivers = Broadcaster::new();
345        /// 
346        ///     async {
347        ///         receivers.read().await.each_room_immut(|room| println!("hello, {}. guest!", room.id));
348        ///     };
349        /// 
350        /// }
351        /// 
352        /// 
353        /// ```
354        pub fn each_room_immut<F>(&self, f: F) where F: Fn(&Room<T, S>) {
355            for room in &self.rooms {
356                f(room);
357            }
358        }
359
360        /// iterates through every room and does something with them immutably. You cannot mutate rooms itself but can mutate captured variables.
361        /// 
362        /// ```rust
363        /// 
364        /// use axum_wsb::typed::Broadcaster;
365        /// use tokio::sync::RwLock;
366        /// use std::sync::Arc;
367        /// 
368        /// fn main () {
369        ///     let receivers = Broadcaster::new();
370        /// 
371        ///     let mut num = 0;
372        /// 
373        ///     async {
374        ///         receivers.read().await.each_room(|room| {
375        ///             num = num + 1;
376        ///         });
377        ///     };
378        /// 
379        /// 
380        ///     println!("here is number: {}", num)
381        /// }
382        /// 
383        /// 
384        /// ```
385        pub fn each_room<F>(&self, mut f: F) where F: FnMut(&Room<T, S>) {
386            for room in &self.rooms {
387                f(room);
388            }
389        }
390
391        /// iterates through every room and does something with them mutably. You can mutate everything belong to it. But warning, for now, you cannot send messages to client from it right now and until async closures will be stable probably we're not be able to do it. Because of that, we're not able to give examples for that.
392        pub async fn each_room_mut<F>(&mut self, mut f: F) where F: FnMut(&mut Room<T, S>) {
393            for room in &mut self.rooms {
394                f(room);
395            }
396        }
397
398        /// Get the Room with given id. If there is a risk of unextistance of the room, use ".check_room()" instead.
399        pub fn room(&mut self, id: &String) -> &mut Room<T, S> {
400            return self.rooms.iter_mut().find(|room| room.id == *id).unwrap();
401        }
402
403        /// check if a room with given id exist and wrap it in an option.
404        pub fn check_room(&mut self, id: &String) -> Option<&mut Room<T, S>> {
405            match self.rooms.iter_mut().find(|room| room.id == *id) {
406                Some(room) => Some(room),
407                None => None
408            }
409        }
410
411        /// only check if a room is exist and return a bool.
412        pub fn check(&self, id: &String) -> bool {
413            return self.rooms.iter().any(|room| room.id == *id);
414        }
415
416        /// it removes a room with given id and closes all the connections inside of it.
417        pub async fn remove_room(&mut self, id: &String) where T: Clone {
418            self.rooms.retain_mut(|room| { 
419                if room.id == *id {
420                    let _ = async {
421                        let _ = room.close(None).await;
422                    };
423                    
424                    false 
425                } else { 
426                    true 
427                }
428            });
429        }
430
431        /// it removes all empty rooms.
432        pub fn remove_empty_rooms(&mut self) {
433            self.rooms.retain(|room| { 
434                if room.connections.is_empty() { 
435                    false 
436                } else { 
437                    true 
438                }
439            });
440        }
441
442        /// Removes the connection from Room. Warning: Because the async closures are not stable yet, we cannot close the connection in that function, you have to make cleanup on your cadebase. For that, check the examples & Documentation.
443        pub fn remove_connection(&mut self, id: &String) -> Option<SplitSink<WebSocket<T, S>, Message<T>>> {
444            for room in &mut self.rooms {
445                if let Some(pos) = room.connections.iter().position(|connection| connection.id == *id) {
446                    let connection = room.connections.remove(pos);
447                    return Some(connection.receiver);
448                }
449            }
450            None
451        }
452    }
453
454    impl<T, S> Default for Broadcaster<T, S> { 
455        fn default() -> Self { 
456            Self { 
457                rooms: vec![], 
458            } 
459        }
460    }
461}
462
463pub mod normal {
464    use axum_8_4::{body::Bytes, extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket}};
465    use std::sync::Arc;
466    use tokio::sync::RwLock;
467    use futures_util::{sink::SinkExt, stream::{SplitSink, SplitStream, StreamExt}};
468
469    /// main broadcaster for normal api.
470    #[derive(Debug)]
471    pub struct Broadcaster {
472        pub rooms: Vec<Room>
473    }
474
475    /// room implementation.
476    #[derive(Debug)]
477    pub struct Room {
478        pub id: String,
479        pub connections: Vec<Connection>
480    }
481
482    /// type for each individual connection.
483    #[derive(Debug)]
484    pub struct Connection {
485        pub id: String,
486        pub receiver: SplitSink<WebSocket, Message>
487    }
488
489    impl Connection {
490        /// create a connection:
491        pub fn create(id: String, receiver: SplitSink<WebSocket, Message>) -> Self {
492            Self {
493                id, 
494                receiver
495            }
496        }
497
498        // send the text message.
499        pub async fn send(&mut self, message: Utf8Bytes) -> Result<(), axum_8_4::Error> {
500            match self.receiver.send(Message::Text(message)).await {
501                Ok(_) => Ok(()),
502                Err(error) => Err(error)
503            }
504        }
505
506        /// send the text message if the given condition in it's closure is true.
507        pub async fn send_if<F>(&mut self, message: Utf8Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
508            if condition(&self) { 
509                let _ = self.receiver.send(Message::Text(message)).await;
510            } 
511        }
512
513        /// sen the text message if the given condition in it's closure is true.
514        pub async fn send_if_not<F>(&mut self, message: Utf8Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
515            if !condition(&self) { 
516                let _ = self.receiver.send(Message::Text(message)).await;
517            } 
518        }
519
520        /// send the ping.
521        pub async fn ping(&mut self, message: Bytes) -> Result<(), axum_8_4::Error> {
522            match self.receiver.send(Message::Ping(message)).await {
523                Ok(_) => Ok(()),
524                Err(error) => Err(error)
525            }
526        }
527
528        /// send the ping if the given condition in it's closure is true.
529        pub async fn ping_if<F>(&mut self, message: Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
530            if condition(&self) { 
531                let _ = self.receiver.send(Message::Ping(message)).await;
532            } 
533        }
534
535        /// sen the ping if the given condition in it's closure is true.
536        pub async fn ping_if_not<F>(&mut self, message: Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
537            if !condition(&self) { 
538                let _ = self.receiver.send(Message::Ping(message)).await;
539            } 
540        }
541
542        /// send the pong.
543        pub async fn pong(&mut self, message: Bytes) -> Result<(), axum_8_4::Error> {
544            match self.receiver.send(Message::Pong(message)).await {
545                Ok(_) => Ok(()),
546                Err(error) => Err(error)
547            }
548        }
549
550        /// send the pong if the given condition in it's closure is true.
551        pub async fn pong_if<F>(&mut self, message: Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
552            if condition(&self) { 
553                let _ = self.receiver.send(Message::Pong(message)).await;
554            } 
555        }
556
557        /// sen the pong if the given condition in it's closure is true.
558        pub async fn pong_if_not<F>(&mut self, message: Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
559            if !condition(&self) { 
560                let _ = self.receiver.send(Message::Pong(message)).await;
561            } 
562        }
563    }
564
565    impl Room {
566        /// check if a connection with given id exist and if it's not, add a connection to a room with that ip:
567        pub fn add_connection(&mut self, id: &String, receiver: SplitSink<WebSocket, Message>) {
568            let check_is_connection_exist = self.connections.iter().any(|room| room.id == *id);
569
570            match check_is_connection_exist {
571                true => (),
572                false => {
573                    let connection = Connection {
574                        id: id.clone(),
575                        receiver
576                    };
577
578                    self.connections.push(connection);
579                }
580            }
581        }
582
583        /// remove a connection from room with given id.
584        pub fn remove_connection(&mut self, id: String) {
585            self.connections.retain(|connection| { 
586                if connection.id == id { 
587                    false 
588                } else { 
589                    true 
590                }
591            });
592        }
593
594        /// check if a connection exist and return if it's in an option.
595        pub fn check_connection(&mut self, id: &String) -> Option<&Connection> {
596            let connection = self.connections.iter().find(|room| room.id == *id);
597
598            match connection {
599                Some(connection) => Some(connection),
600                None => None
601            }
602        }
603
604        /// Broadcast the message directly.
605        pub async fn broadcast(&mut self, message: &Utf8Bytes) { 
606            for connection in &mut self.connections { 
607                let msg = Message::Text(message.clone());
608                let receiver = &mut connection.receiver; 
609                
610                let _ = receiver.send(msg).await;
611            }
612        }
613
614        /// broadcast the message if the given condition in it's closure is true.
615        pub async fn broadcast_if<F>(&mut self, message: &Utf8Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
616            for connection in &mut self.connections { 
617                if condition(connection) { 
618                    let msg = Message::Text(message.clone()); 
619                    let receiver = &mut connection.receiver; 
620                    let _ = receiver.send(msg).await;
621                } 
622            } 
623        }
624
625        /// broadcast the message if the given condition in it's closure is false.
626        pub async fn broadcast_if_not<F>(&mut self, message: &Utf8Bytes, condition: F) where F: Fn(&Connection) -> bool { 
627            for connection in &mut self.connections { 
628                if !condition(connection) { 
629                    let msg = Message::Text(message.clone()); 
630                    let receiver = &mut connection.receiver; 
631                    let _ = receiver.send(msg).await;
632                } 
633            } 
634        }
635
636        /// Broadcast the ping message directly.
637        pub async fn ping(&mut self, bytes: &Bytes) { 
638            for connection in &mut self.connections { 
639                let msg = Message::Ping(bytes.clone());
640                let receiver = &mut connection.receiver; 
641                        
642                let _ = receiver.send(msg).await;
643            }
644        }
645        
646        /// broadcast the ping message if the given condition in it's closure is true.
647        pub async fn ping_if<F>(&mut self, bytes: &Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
648            for connection in &mut self.connections { 
649                if condition(connection) { 
650                    let msg = Message::Ping(bytes.clone());
651                    let receiver = &mut connection.receiver; 
652                    let _ = receiver.send(msg).await;
653                } 
654            } 
655        }
656        
657        /// broadcast the ping message if the given condition in it's closure is false.
658        pub async fn ping_if_not<F>(&mut self, bytes: &Bytes, condition: F) where F: Fn(&Connection) -> bool { 
659            for connection in &mut self.connections { 
660                if !condition(connection) { 
661                    let msg = Message::Ping(bytes.clone()); 
662                    let receiver = &mut connection.receiver; 
663                    let _ = receiver.send(msg).await;
664                } 
665            } 
666        }
667
668        /// Broadcast the pong message directly.
669        pub async fn pong(&mut self, bytes: &Bytes) { 
670            for connection in &mut self.connections { 
671                let msg = Message::Pong(bytes.clone());
672                let receiver = &mut connection.receiver; 
673                                
674                let _ = receiver.send(msg).await;
675            }
676        }
677                
678        /// broadcast the pong message if the given condition in it's closure is true.
679        pub async fn pong_if<F>(&mut self, bytes: &Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
680            for connection in &mut self.connections { 
681                if condition(connection) { 
682                    let msg = Message::Pong(bytes.clone());
683                    let receiver = &mut connection.receiver; 
684                    let _ = receiver.send(msg).await;
685                } 
686            } 
687        }
688                
689        /// broadcast the pong message if the given condition in it's closure is false.
690        pub async fn pong_if_not<F>(&mut self, bytes: &Bytes, condition: F) where F: Fn(&Connection) -> bool { 
691            for connection in &mut self.connections { 
692                if !condition(connection) { 
693                    let msg = Message::Pong(bytes.clone()); 
694                    let receiver = &mut connection.receiver; 
695                    let _ = receiver.send(msg).await;
696                } 
697            } 
698        }
699
700        /// Broadcast the raw binary bytes directly.
701        pub async fn binary(&mut self, bytes: &Bytes) { 
702            for connection in &mut self.connections { 
703                let msg = Message::Binary(bytes.clone());
704                let receiver = &mut connection.receiver; 
705                                        
706                let _ = receiver.send(msg).await;
707            }
708        }
709
710        /// broadcast the raw binary bytes if the given condition in it's closure is true.
711        pub async fn binary_if<F>(&mut self, bytes: &Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
712            for connection in &mut self.connections { 
713                if condition(connection) { 
714                    let msg = Message::Binary(bytes.clone());
715                    let receiver = &mut connection.receiver; 
716                    let _ = receiver.send(msg).await;
717                } 
718            } 
719        }
720
721        /// broadcast the raw binary bytes if the given condition in it's closure is false.
722        pub async fn binary_if_not<F>(&mut self, bytes: &Bytes, condition: F) where F: Fn(&Connection) -> bool, { 
723            for connection in &mut self.connections { 
724                if !condition(connection) { 
725                    let msg = Message::Binary(bytes.clone());
726                    let receiver = &mut connection.receiver; 
727                    let _ = receiver.send(msg).await;
728                } 
729            } 
730        }
731
732        /// Close all connections and remove it from it's room but not close it.
733        pub async fn close(&mut self, close_frame: Option<CloseFrame>) { 
734            self.connections.retain_mut(|connection| {
735                let msg = Message::Close(close_frame.clone());
736                let receiver = &mut connection.receiver; 
737                                                
738                let _ = async {
739                    let _ = receiver.send(msg).await;
740                };
741                
742                false
743            });
744        }
745
746        /// it's most convenient way to close a single connection but keeping room open.
747        pub async fn close_conn(&mut self, close_frame: Option<CloseFrame>, id: &String) {
748            self.connections.retain_mut(|connection| {
749                if connection.id == *id {
750                    let msg = Message::Close(close_frame.clone());
751                    let receiver = &mut connection.receiver; 
752                                                    
753                    let _ = async {
754                        let _ = receiver.send(msg).await;
755                    };
756                    
757                    false
758                } else {
759                    true
760                }
761            });
762        }
763
764        /// close each connection and remove them from room if the given condition in it's closure is true.
765        pub async fn close_if<F>(&mut self, close_frame: Option<CloseFrame>, condition: F) where F: Fn(&Connection) -> bool, { 
766            self.connections.retain_mut(|connection| {
767                if condition(&connection) {
768                    let msg = Message::Close(close_frame.clone());
769                    let receiver = &mut connection.receiver; 
770                                                    
771                    let _ = async {
772                        let _ =  receiver.send(msg).await;
773                    };
774                    
775                    false
776                } else {
777                    true
778                }
779            });
780        }
781
782        /// close each connection and remove them from room if the given condition in it's closure is false.
783        pub async fn close_if_not<F>(&mut self, close_frame: Option<CloseFrame>, condition: F) where F: Fn(&Connection) -> bool, { 
784            self.connections.retain_mut(|connection| {
785                if !condition(&connection) {
786                    let msg = Message::Close(close_frame.clone());
787                    let receiver = &mut connection.receiver; 
788                                                            
789                    let _ = async {
790                        let _ =  receiver.send(msg).await;
791                    };
792                            
793                    false
794                } else {
795                    true
796                }
797            });
798        }
799    }
800
801    impl Broadcaster {
802        /// create new broadcaster.
803        pub fn new() -> Arc<RwLock<Self>> {
804            Arc::new(RwLock::new(Self::default()))
805        }
806
807        /// get receiver and stream, similar to ".handle()" method of actix-ws.
808        pub fn configure(socket: WebSocket) -> (SplitSink<WebSocket, Message>, SplitStream<WebSocket>) {
809            socket.split()
810        }
811
812        /// handle the all thing. If you use that api, there is no need to any other configuration for grouping and identifying connections:
813        pub async fn handle(broadcaster: &Arc<RwLock<Self>>, room_id: &String, conn_id: &String, receiver: SplitSink<WebSocket, Message>) -> Arc<RwLock<Self>> {
814            let mut broadcaster_write = broadcaster.write().await;
815
816            broadcaster_write.handle_room(room_id).add_connection(conn_id, receiver);
817
818            Arc::clone(&broadcaster)
819        }
820
821        /// check if a room with given id exist and if it's not create one:
822        pub fn handle_room(&mut self, id: &String) -> &mut Room {
823            if let Some(index) = self.rooms.iter().position(|room| room.id == *id) {
824                return &mut self.rooms[index];
825            }
826        
827            self.rooms.push(Room {
828                id: id.clone(),
829                connections: vec![],
830            });
831        
832            self.rooms.last_mut().unwrap()
833        }
834
835        /// Get the Room with given id. If there is a risk of unextistance of the room, use ".check_room()" instead.
836        pub fn room(&mut self, id: &String) -> &mut Room {
837            return self.rooms.iter_mut().find(|room| room.id == *id).unwrap();
838        }
839
840        /// iterates through every room and does something with them immutably. You cannot mutate anything inside of it, even rooms and not captured variables.      
841        /// 
842        /// ```rust
843        /// 
844        /// use axum_wsb::normal::Broadcaster;
845        /// use tokio::sync::RwLock;
846        /// use std::sync::Arc;
847        /// 
848        /// fn main () {
849        ///     let receivers: Arc<RwLock<Broadcaster>> = Broadcaster::new();
850        /// 
851        ///     async {
852        ///         receivers.read().await.each_room_immut(|room| println!("hello, {}. guest!", room.id));
853        ///     };
854        /// 
855        /// }
856        /// 
857        /// 
858        /// ```
859        pub fn each_room_immut<F>(&self, f: F) where F: Fn(&Room) {
860            for room in &self.rooms {
861                f(room);
862            }
863        }
864
865        /// iterates through every room and does something with them immutably. You cannot mutate rooms itself but can mutate captured variables.
866        /// 
867        /// ```rust
868        /// 
869        /// use axum_wsb::normal::Broadcaster;
870        /// use tokio::sync::RwLock;
871        /// use std::sync::Arc;
872        /// 
873        /// fn main () {
874        ///     let receivers: Arc<RwLock<Broadcaster>> = Broadcaster::new();
875        /// 
876        ///     let mut num = 0;
877        /// 
878        ///     async {
879        ///         receivers.read().await.each_room(|room| {
880        ///             num = num + 1;
881        ///         });
882        ///     };
883        /// 
884        /// 
885        ///     println!("here is number: {}", num)
886        /// }
887        /// 
888        /// 
889        /// ```
890        pub fn each_room<F>(&self, mut f: F) where F: FnMut(&Room) {
891            for room in &self.rooms {
892                f(room);
893            }
894        }
895
896        /// iterates through every room and does something with them mutably. You can mutate everything belong to it. But warning, for now, you cannot send messages to client from it right now and until async closures will be stable probably we're not be able to do it. Because of that, we're not able to give examples for that.
897        pub async fn each_room_mut<F>(&mut self, mut f: F) where F: FnMut(&mut Room) {
898            for room in &mut self.rooms {
899                f(room);
900            }
901        }
902
903        /// check if a room with given id exist and wrap it in an option.
904        pub fn check_room(&mut self, id: &String) -> Option<&mut Room> {
905            match self.rooms.iter_mut().find(|room| room.id == *id) {
906                Some(room) => Some(room),
907                None => None
908            }
909        }
910
911        /// only check if a room exist and if it's return true.
912        pub fn check(&self, id: &String) -> bool {
913            return self.rooms.iter().any(|room| room.id == *id);
914        }
915
916        /// it removes a room with given id and closes all the connections inside of it.
917        pub async fn remove_room(&mut self, id: &String) {
918            self.rooms.retain_mut(|room| { 
919                if room.id == *id {
920                    let _ = async {
921                        let _ = room.close(None).await;
922                    };
923                    
924                    false 
925                } else { 
926                    true 
927                }
928            });
929        }
930
931        /// it removes all empty rooms.
932        pub fn remove_empty_rooms(&mut self) {
933            self.rooms.retain(|room| { 
934                if room.connections.is_empty() { 
935                    false 
936                } else { 
937                    true 
938                }
939            });
940        }
941
942        /// Removes the connection from Room. Warning: Because the async closures are not stable yet, we cannot close the connection in that function, you have to make cleanup on your cadebase. For that, check the examples & Documentation.
943        pub fn remove_connection(&mut self, id: &String) -> Option<SplitSink<WebSocket, Message>> {
944            for room in &mut self.rooms {
945                if let Some(pos) = room.connections.iter().position(|connection| connection.id == *id) {
946                    let connection = room.connections.remove(pos);
947                    return Some(connection.receiver);
948                }
949            }
950            None
951        }
952    }
953
954    impl Default for Broadcaster { 
955        fn default() -> Self { 
956            Self { 
957                rooms: vec![], 
958            } 
959        }
960    }
961}