actix_wsb/
lib.rs

1use std::sync::{Arc, RwLock};
2use actix_ws::{CloseReason, Item, Session};
3use actix_web::web::Bytes;
4
5#[derive(Clone)]
6pub struct Connection {
7    pub id: String,
8    pub session: Session
9}
10
11#[derive(Clone)]
12pub struct Room {
13    pub id: String,
14    pub connectors: Vec<Connection>
15}
16
17#[derive(Clone)]
18pub struct Broadcaster {
19    pub rooms: Vec<Room>
20}
21
22impl Connection {
23    /// creates a single connection.
24    pub fn create(id: String, session: Session) -> Self {
25        Self {
26            id, 
27            session
28        }
29    }
30
31    /// sends message from single connection.
32    pub async fn send(&mut self, message: String) -> () {
33        self.session.text(message).await.unwrap();
34    }
35
36    /// sends message from single connection if given condition is true.
37    pub async fn send_if<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool {
38        if condition(&self) {
39            self.session.text(message).await.unwrap();
40        }
41    }
42
43    /// */ sends message from single connection if given condition is false.
44    pub async fn send_if_not<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool {
45        if !condition(&self) {
46            self.session.text(message).await.unwrap();
47        }
48    }
49
50    /// sends a ping message from single connection.
51    pub async fn ping(&mut self, bytes: &Vec<u8>) -> () {
52        self.session.ping(bytes).await.unwrap();
53    }
54
55    /// sends a ping message from single connection if given condition is true.
56    pub async fn ping_if<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
57        if condition(&self) {
58            self.session.ping(bytes).await.unwrap();
59        }
60    }
61
62    /// */ sends a ping message from single connection if given condition is false.
63    pub async fn ping_if_not<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
64        if !condition(&self) {
65            self.session.ping(bytes).await.unwrap();
66        }
67    }
68
69    /// sends a pong message from single connection.
70    pub async fn pong(&mut self, bytes: &Vec<u8>) -> () {
71        self.session.pong(bytes).await.unwrap();
72    }
73
74    /// sends a pong message from single connection if given condition is true.
75    pub async fn pong_if<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
76        if condition(&self) {
77            self.session.pong(bytes).await.unwrap();
78        }
79    }
80
81    /// sends a pong message from single connection if given condition is false.
82    pub async fn pong_if_not<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
83        if !condition(&self) {
84            self.session.pong(bytes).await.unwrap();
85        }
86    }
87
88    /// sends a pong message from single connection.
89    pub async fn binary(&mut self, bytes: Bytes) -> () {
90        self.session.binary(bytes).await.unwrap();
91    }
92
93    /// sends a pong message from single connection if given condition is true.
94    pub async fn binary_if<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool {
95        if condition(&self) {
96            self.session.binary(bytes).await.unwrap();
97        }
98    }
99
100    /// sends a pong message from single connection if given condition is false.
101    pub async fn binary_if_not<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool {
102        if !condition(&self) {
103            self.session.binary(bytes).await.unwrap();
104        }
105    }
106
107    /// sends a continuation message from single connection with given type.
108    pub async fn continuation(&mut self, item: Item) -> () {
109        match item {
110            Item::FirstText(ref text) => {
111                let text = text;
112                self.session.continuation(Item::FirstText(text.clone())).await.unwrap()
113            },
114            Item::FirstBinary(ref binary) => {
115                let binary = binary;
116                self.session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
117            },
118            Item::Continue(ref cont_msg) => {
119                let cont_msg = cont_msg;
120                self.session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
121            },
122            Item::Last(ref last_msg) => {
123                let last_msg = last_msg;
124                self.session.continuation(Item::Last(last_msg.clone())).await.unwrap()
125            }
126        }
127    }
128
129    /// sends a continuation message from single connection with given type if given condition is true.
130    pub async fn continuation_if<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool { 
131        if condition(&self) { 
132            match item {
133                Item::FirstText(ref text) => {
134                    let text = text;
135                    self.session.continuation(Item::FirstText(text.clone())).await.unwrap()
136                },
137                Item::FirstBinary(ref binary) => {
138                    let binary = binary;
139                    self.session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
140                },
141                Item::Continue(ref cont_msg) => {
142                    let cont_msg = cont_msg;
143                    self.session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
144                },
145                Item::Last(ref last_msg) => {
146                    let last_msg = last_msg;
147                    self.session.continuation(Item::Last(last_msg.clone())).await.unwrap()
148                }
149            }
150        } 
151    }
152
153    /// sends a continuation message from single connection with given type if given condition is false.
154    pub async fn continuation_if_not<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool { 
155        if !condition(&self) { 
156            match item {
157                Item::FirstText(ref text) => {
158                    let text = text;
159                    self.session.continuation(Item::FirstText(text.clone())).await.unwrap()
160                },
161                Item::FirstBinary(ref binary) => {
162                    let binary = binary;
163                    self.session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
164                },
165                Item::Continue(ref cont_msg) => {
166                    let cont_msg = cont_msg;
167                    self.session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
168                },
169                Item::Last(ref last_msg) => {
170                    let last_msg = last_msg;
171                    self.session.continuation(Item::Last(last_msg.clone())).await.unwrap()
172                }
173            }
174        } 
175    }
176}
177
178impl Room {
179    /// checks if a connection with given id exist and if it's not add a connection with given id and Session to a room.
180    pub fn add_connection(&mut self, id: &String, session: Session) {
181        let check_is_connection_exist = self.connectors.iter().any(|room| room.id == *id);
182
183        match check_is_connection_exist {
184            true => (),
185            false => {
186                let connection = Connection {
187                    id: id.clone(),
188                    session
189                };
190
191                self.connectors.push(connection);
192            }
193        }
194    }
195
196    /// removes if a connection with given id exist.
197    pub fn remove_connection(&mut self, id: String) {
198        self.connectors.retain(|connection| { 
199            if connection.id == id { 
200                false 
201            } else { 
202                true 
203            }
204        });
205    }
206
207    /// checks if a connection exist and returns it as an option.
208    pub fn check_connection(&mut self, id: &String) -> Option<Connection> {
209        let connection = self.connectors.iter().find(|room| room.id == *id);
210
211        match connection {
212            Some(connection) => Some(connection.clone()),
213            None => None
214        }
215    }
216
217    /// broadcastes the message to all room connectors.
218    pub async fn broadcast(&mut self, message: String) {
219        for connection in &mut self.connectors { 
220            let message = message.clone(); 
221            let session = &mut connection.session; 
222            
223            let _ = session.text(message).await;
224        }
225    }
226
227    /// broadcastes the message if given condition for connection instances is true.
228    pub async fn broadcast_if<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool { 
229        for connection in &mut self.connectors { 
230            if condition(connection) { 
231                let message = message.clone(); 
232                let session = &mut connection.session; 
233                let _ = session.text(message).await;
234            } 
235        } 
236    }
237
238    /// broadcastes the message if given condition for connection instances is false.
239    pub async fn broadcast_if_not<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool { 
240        for connection in &mut self.connectors { 
241            if !condition(connection) { 
242                let message = message.clone(); 
243                let session = &mut connection.session; 
244                let _ = session.text(message).await;
245            } 
246        } 
247    }
248
249    /// broadcastes the ping to all room connectors.
250    pub async fn ping(&mut self, bytes: Vec<u8>) { 
251        for connection in &mut self.connectors { 
252            let message = &bytes; 
253            let session = &mut connection.session; 
254                
255            let _ = session.ping(message).await;
256        }
257    }
258
259    /// broadcastes the ping if given condition for connection instances is true.
260    pub async fn ping_if<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool { 
261        for connection in &mut self.connectors { 
262            if condition(connection) { 
263                let message = &bytes; 
264                let session = &mut connection.session; 
265                let _ = session.ping(message).await;
266            } 
267        } 
268    }
269
270    /// broadcastes the ping if given condition for connection instances is false.
271    pub async fn ping_if_not<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool { 
272        for connection in &mut self.connectors { 
273            if !condition(connection) { 
274                let message = &bytes; 
275                let session = &mut connection.session; 
276                let _ = session.ping(message).await;
277            } 
278        } 
279    }
280
281    /// broadcastes the pong to all room connectors.
282    pub async fn pong(&mut self, bytes: Vec<u8>) { 
283        for connection in &mut self.connectors { 
284            let message = &bytes; 
285            let session = &mut connection.session; 
286                
287            let _ = session.pong(message).await;
288        }
289    }
290
291    /// broadcastes the pong if given condition for connection instances is true.
292    pub async fn pong_if<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool { 
293        for connection in &mut self.connectors { 
294            if condition(connection) { 
295                let message = &bytes; 
296                let session = &mut connection.session; 
297                let _ = session.pong(message).await;
298            } 
299        } 
300    }
301
302    /// broadcastes the pong if given condition for connection instances is false.
303    pub async fn pong_if_not<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool { 
304        for connection in &mut self.connectors { 
305            if !condition(connection) { 
306                let message = &bytes; 
307                let session = &mut connection.session; 
308                let _ = session.pong(message).await;
309            } 
310        } 
311    }
312
313    /// Broadcastes the raw binary bytes to all room connectors.
314    pub async fn binary(&mut self, bytes: Bytes) { 
315        for connection in &mut self.connectors { 
316            let message = bytes.clone();
317            let session = &mut connection.session; 
318                
319            let _ = session.binary(message).await;
320        }
321    }
322
323    /// broadcastes the raw binary bytes if given condition for connection instances is true.
324    pub async fn binary_if<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool { 
325        for connection in &mut self.connectors { 
326            if condition(connection) { 
327                let message = bytes.clone(); 
328                let session = &mut connection.session; 
329                let _ = session.binary(message).await;
330            } 
331        } 
332    }
333
334    /// broadcastes the raw binary bytes if given condition for connection instances is false.
335    pub async fn binary_if_not<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool { 
336        for connection in &mut self.connectors { 
337            if !condition(connection) { 
338                let message = bytes.clone(); 
339                let session = &mut connection.session; 
340                let _ = session.binary(message).await;
341            } 
342        } 
343    }
344
345    /// Broadcastes the continuation message to all room connectors.
346    pub async fn continuation(&mut self, item: Item) { 
347        for connection in &mut self.connectors { 
348            let session = &mut connection.session; 
349
350            match item {
351                Item::FirstText(ref text) => {
352                    let text = text;
353                    session.continuation(Item::FirstText(text.clone())).await.unwrap()
354                },
355                Item::FirstBinary(ref binary) => {
356                    let binary = binary;
357                    session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
358                },
359                Item::Continue(ref cont_msg) => {
360                    let cont_msg = cont_msg;
361                    session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
362                },
363                Item::Last(ref last_msg) => {
364                    let last_msg = last_msg;
365                    session.continuation(Item::Last(last_msg.clone())).await.unwrap()
366                }
367            }
368        }
369    }
370
371    /// broadcastes the continuation messages if given condition for connection instances is true.
372    pub async fn continuation_if<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool { 
373        for connection in &mut self.connectors { 
374            if condition(connection) { 
375                let session = &mut connection.session; 
376
377                match item {
378                    Item::FirstText(ref text) => {
379                        let text = text;
380                        session.continuation(Item::FirstText(text.clone())).await.unwrap()
381                    },
382                    Item::FirstBinary(ref binary) => {
383                        let binary = binary;
384                        session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
385                    },
386                    Item::Continue(ref cont_msg) => {
387                        let cont_msg = cont_msg;
388                        session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
389                    },
390                    Item::Last(ref last_msg) => {
391                        let last_msg = last_msg;
392                        session.continuation(Item::Last(last_msg.clone())).await.unwrap()
393                    }
394                }
395            } 
396        } 
397    }
398
399    /// broadcastes the continuation messages if given condition for connection instances is false.
400    pub async fn continuation_if_not<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool { 
401        for connection in &mut self.connectors { 
402            if !condition(connection) { 
403                let session = &mut connection.session; 
404    
405                match item {
406                    Item::FirstText(ref text) => {
407                        let text = text;
408                        session.continuation(Item::FirstText(text.clone())).await.unwrap()
409                    },
410                    Item::FirstBinary(ref binary) => {
411                        let binary = binary;
412                        session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
413                    },
414                    Item::Continue(ref cont_msg) => {
415                        let cont_msg = cont_msg;
416                        session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
417                    },
418                    Item::Last(ref last_msg) => {
419                        let last_msg = last_msg;
420                        session.continuation(Item::Last(last_msg.clone())).await.unwrap()
421                    }
422                }
423            } 
424        } 
425    }
426
427    /// closes the connection with given id and removes it from it's room. This is the convenient way of closing a connection.
428    /// 
429    /// ```rust
430    /// 
431    /// Message::Close(reason) => {
432    ///     let _ = get_broadcaster.write().unwrap().room(&room_id).close_conn(reason, &id).await;
433    /// 
434    ///     break;
435    /// },
436    /// 
437    /// ```
438    pub async fn close_conn(&mut self, reason: Option<CloseReason>, id: &String) {
439        self.connectors.retain(|conn| {
440            if conn.id == *id {
441                let reason = reason.clone();
442                    
443                let _ = async {
444                    let _ = conn.session.clone().close(reason).await;
445                };
446        
447                false
448            } else {
449                true
450            }
451        });
452    }
453
454    /// closes all the connections and entire room. Warning: it closes all connections but keeps room open, if you want to close all the connections directly, use the `.remove_room()` method of the Broadcaster struct instead.
455    pub async fn close(&mut self, reason: Option<CloseReason>) {
456        self.connectors.retain(|conn| {
457            let reason = reason.clone();
458                
459            let _ = async {
460                let _ = conn.session.clone().close(reason).await;
461            };
462
463            false
464        });
465    }
466    
467    /// closes the connection and removes it from room if given condition for connection instances is true. Room still stay open.
468    pub async fn close_if<F>(&mut self, reason: Option<CloseReason>, condition: F) where F: Fn(&Connection) -> bool { 
469        self.connectors.retain(|connection| {
470            if condition(connection) {
471                let reason = reason.clone();
472                
473                let _ = async {
474                    let _ = connection.session.clone().close(reason).await;
475                };
476    
477                false
478            } else {
479                true
480            }
481        });
482    }
483    
484    /// closes the connection and removes it from room if given condition for connection instances is false. Room still stay open.
485    pub async fn close_if_not<F>(&mut self, reason: Option<CloseReason>, condition: F) where F: Fn(&Connection) -> bool { 
486        self.connectors.retain(|connection| {
487            if !condition(connection) {
488                let reason = reason.clone();
489                
490                let _ = async {
491                    let _ = connection.session.clone().close(reason).await;
492                };
493    
494                false
495            } else {
496                true
497            }
498        });
499    }
500}
501
502impl Broadcaster {
503    /// create a new broadcaster instance. 
504    pub fn new() -> Arc<RwLock<Self>> { 
505        Arc::new(RwLock::new(Self::default())) 
506    }
507
508    /// does all the setup basically. You don't have to use other functions for all the grouping of rooms and connections. You can give the same room id for all instances if you don't want to seperate communication groups. But you have to give different connection id's to each session, otherwise it'll introduce bugs.
509    ///     let id = query.id.as_ref().unwrap().to_string();
510    /// 
511    ///```rust
512    /// 
513    /// let id = query.id.as_ref().unwrap().to_string();
514    /// let room_id = query.room.as_ref().unwrap().to_string();
515    ///
516    /// let get_broadcaster = Broadcaster::handle(&broadcaster, &room_id, &id, session);
517    /// 
518    ///```
519    pub fn handle(broadcaster: &Arc<RwLock<Self>>, room_id: &String, conn_id: &String, session: Session) -> Arc<RwLock<Self>> {
520        let mut broadcaster_write = broadcaster.write().unwrap();
521
522        broadcaster_write.handle_room(room_id).add_connection(conn_id, session);
523
524        Arc::clone(&broadcaster)
525    }
526    
527    /// this function check if a room exist and if it's exist returns it, if it's not then creates it. If you just want to check if a room exist, use .check() instead.
528    /// 
529    ///```rust
530    /// 
531    /// let mut broadcaster_write = broadcaster.write().unwrap();
532    /// 
533    /// let room_id = "1".to_string();
534    ///
535    /// broadcaster_write.handle_room(&room_id)
536    /// 
537    ///```
538    /// 
539    pub fn handle_room(&mut self, id: &String) -> &mut Room {
540        if let Some(index) = self.rooms.iter().position(|room| room.id == *id) {
541            return &mut self.rooms[index];
542        }
543    
544        self.rooms.push(Room {
545            id: id.clone(),
546            connectors: vec![],
547        });
548    
549        self.rooms.last_mut().unwrap()
550    }
551
552    /// it scans a room with given id and it returns it if it's exist. if there is a risk that room isn't exist than use ".check_room()"
553    pub fn room(&mut self, id: &String) -> &mut Room {
554        return self.rooms.iter_mut().find(|room| room.id == *id).unwrap();
555    }
556
557    /// checks a room and if it's exist, returns a mutable reference of that room.
558    pub fn check_room(&mut self, id: &String) -> Option<&mut Room> {
559        match self.rooms.iter_mut().find(|room| room.id == *id) {
560            Some(room) => Some(room),
561            None => None
562        }
563    }
564
565    /// it returns room if exist with given ip. Use .handle_room() method if you want to create a room with given id.
566    pub fn check(&self, id: &String) -> bool {
567        return self.rooms.iter().any(|room| room.id == *id);
568    }
569
570    /// iterates through every room and does something with them immutably. You cannot mutate anything inside of it, even rooms and not captured variables.      
571    /// 
572    /// ```rust
573    /// 
574    /// use actix_wsb::Broadcaster;
575    /// 
576    /// fn main () {
577    ///     let broadcaster = Broadcaster::new();
578    /// 
579    ///     broadcaster.read().unwrap().each_room_immut(|room| println!("hello, {}. guest!", room.id));
580    /// }
581    /// 
582    /// 
583    /// ```
584    pub fn each_room_immut<F>(&self, f: F) where F: Fn(&Room) {
585        for room in &self.rooms {
586            f(room);
587        }
588    }
589    
590    /// iterates through every room and does something with them immutably. You cannot mutate rooms itself but can mutate captured variables.
591    /// 
592    /// ```rust
593    /// 
594    /// use actix_wsb::Broadcaster;
595    /// 
596    /// fn main () {
597    ///     let broadcaster = Broadcaster::new();
598    /// 
599    ///     let mut num = 0;
600    /// 
601    ///     broadcaster.read().unwrap().each_room(|room| {
602    ///         num = num + 1;
603    ///     });
604    /// 
605    ///     println!("here is number: {}", num)
606    /// }
607    /// 
608    /// 
609    /// ```
610    pub fn each_room<F>(&self, mut f: F) where F: FnMut(&Room) {
611        for room in &self.rooms {
612            f(room);
613        }
614    }
615
616    /// iterates through every room and does something with them mutably. You can mutate everything belong to it. But warning, for now, you cannot send messages to client from it right now and until async closures will be stable probably we're not be able to do it. Because of that, we're not able to give examples for that.
617    pub async fn each_room_mut<F>(&mut self, mut f: F) where F: FnMut(&mut Room) {
618        for room in &mut self.rooms {
619            f(room);
620        }
621    }
622
623    /// it removes a room with given id.
624    /// 
625    /// 
626    /// ```rust 
627    /// Message::Close(reason) => {
628    ///     // warning, that closes and removes all the connections but not removes the room: 
629    ///     //let _ = get_broadcaster.write().unwrap().room(room_id.clone()).close(reason).await;
630    ///                
631    ///     // if you want to remove a room with removing all the connections, use this instead:
632    ///     // let _ = get_broadcaster.write().unwrap().remove_room(room_id.clone()).await;
633    ///
634    ///     let _ = get_broadcaster.write().unwrap()
635    ///                                    .room(&room_id)
636    ///                                    .remove_room(reason, |conn| conn.id == *id).await;
637    ///                
638    ///     break;
639    ///  },
640    /// ```
641    ///
642    pub async fn remove_room(&mut self, id: String) {
643        self.rooms.retain(|room| { 
644            if room.id == id { 
645                let _ = async {
646                    room.clone().close(None).await;
647                };
648                
649                false 
650            } else { 
651                true 
652            }
653        });
654    }
655
656    /// it removes all empty rooms.
657    pub fn remove_empty_rooms(&mut self) {
658        self.rooms.retain(|room| { 
659            if room.connectors.is_empty() { 
660                false 
661            } else { 
662                true 
663            }
664        });
665    }
666
667    /// it removes a connection and returns the session struct of it. since async closures not stable yet, we cannot close the actual "Session" implementation in that method. For making that cleanup, we have to get actual Session implementation and close that connection manually - check out the example and readme.
668    /// This is the old way of removing connections. It'll not be removed but we don't recommend to use it unless you don't used it yet.
669    pub fn remove_connection(&mut self, id: String) -> Option<Session> {
670        for room in &mut self.rooms {
671            if let Some(pos) = room.connectors.iter().position(|connection| connection.id == id) {
672                let connection = room.connectors.remove(pos);
673                
674                return Some(connection.session);
675            }
676        }
677        None
678    }
679} 
680
681impl Default for Broadcaster { 
682    fn default() -> Self { 
683        Self { 
684            rooms: vec![], 
685        } 
686    }
687}