1use std::sync::{Arc, RwLock};
2use actix_ws::{CloseReason, Item, Session};
3use actix_web::web::Bytes;
4
5#[derive(Clone)]
6pub struct Connection {
7 pub id: String,
8 pub session: Session
9}
10
11#[derive(Clone)]
12pub struct Room {
13 pub id: String,
14 pub connectors: Vec<Connection>
15}
16
17#[derive(Clone)]
18pub struct Broadcaster {
19 pub rooms: Vec<Room>
20}
21
22impl Connection {
23 pub fn create(id: String, session: Session) -> Self {
25 Self {
26 id,
27 session
28 }
29 }
30
31 pub async fn send(&mut self, message: String) -> () {
33 self.session.text(message).await.unwrap();
34 }
35
36 pub async fn send_if<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool {
38 if condition(&self) {
39 self.session.text(message).await.unwrap();
40 }
41 }
42
43 pub async fn send_if_not<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool {
45 if !condition(&self) {
46 self.session.text(message).await.unwrap();
47 }
48 }
49
50 pub async fn ping(&mut self, bytes: &Vec<u8>) -> () {
52 self.session.ping(bytes).await.unwrap();
53 }
54
55 pub async fn ping_if<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
57 if condition(&self) {
58 self.session.ping(bytes).await.unwrap();
59 }
60 }
61
62 pub async fn ping_if_not<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
64 if !condition(&self) {
65 self.session.ping(bytes).await.unwrap();
66 }
67 }
68
69 pub async fn pong(&mut self, bytes: &Vec<u8>) -> () {
71 self.session.pong(bytes).await.unwrap();
72 }
73
74 pub async fn pong_if<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
76 if condition(&self) {
77 self.session.pong(bytes).await.unwrap();
78 }
79 }
80
81 pub async fn pong_if_not<F>(&mut self, bytes: &Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
83 if !condition(&self) {
84 self.session.pong(bytes).await.unwrap();
85 }
86 }
87
88 pub async fn binary(&mut self, bytes: Bytes) -> () {
90 self.session.binary(bytes).await.unwrap();
91 }
92
93 pub async fn binary_if<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool {
95 if condition(&self) {
96 self.session.binary(bytes).await.unwrap();
97 }
98 }
99
100 pub async fn binary_if_not<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool {
102 if !condition(&self) {
103 self.session.binary(bytes).await.unwrap();
104 }
105 }
106
107 pub async fn continuation(&mut self, item: Item) -> () {
109 match item {
110 Item::FirstText(ref text) => {
111 let text = text;
112 self.session.continuation(Item::FirstText(text.clone())).await.unwrap()
113 },
114 Item::FirstBinary(ref binary) => {
115 let binary = binary;
116 self.session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
117 },
118 Item::Continue(ref cont_msg) => {
119 let cont_msg = cont_msg;
120 self.session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
121 },
122 Item::Last(ref last_msg) => {
123 let last_msg = last_msg;
124 self.session.continuation(Item::Last(last_msg.clone())).await.unwrap()
125 }
126 }
127 }
128
129 pub async fn continuation_if<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool {
131 if condition(&self) {
132 match item {
133 Item::FirstText(ref text) => {
134 let text = text;
135 self.session.continuation(Item::FirstText(text.clone())).await.unwrap()
136 },
137 Item::FirstBinary(ref binary) => {
138 let binary = binary;
139 self.session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
140 },
141 Item::Continue(ref cont_msg) => {
142 let cont_msg = cont_msg;
143 self.session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
144 },
145 Item::Last(ref last_msg) => {
146 let last_msg = last_msg;
147 self.session.continuation(Item::Last(last_msg.clone())).await.unwrap()
148 }
149 }
150 }
151 }
152
153 pub async fn continuation_if_not<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool {
155 if !condition(&self) {
156 match item {
157 Item::FirstText(ref text) => {
158 let text = text;
159 self.session.continuation(Item::FirstText(text.clone())).await.unwrap()
160 },
161 Item::FirstBinary(ref binary) => {
162 let binary = binary;
163 self.session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
164 },
165 Item::Continue(ref cont_msg) => {
166 let cont_msg = cont_msg;
167 self.session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
168 },
169 Item::Last(ref last_msg) => {
170 let last_msg = last_msg;
171 self.session.continuation(Item::Last(last_msg.clone())).await.unwrap()
172 }
173 }
174 }
175 }
176}
177
178impl Room {
179 pub fn add_connection(&mut self, id: &String, session: Session) {
181 let check_is_connection_exist = self.connectors.iter().any(|room| room.id == *id);
182
183 match check_is_connection_exist {
184 true => (),
185 false => {
186 let connection = Connection {
187 id: id.clone(),
188 session
189 };
190
191 self.connectors.push(connection);
192 }
193 }
194 }
195
196 pub fn remove_connection(&mut self, id: String) {
198 self.connectors.retain(|connection| {
199 if connection.id == id {
200 false
201 } else {
202 true
203 }
204 });
205 }
206
207 pub fn check_connection(&mut self, id: &String) -> Option<Connection> {
209 let connection = self.connectors.iter().find(|room| room.id == *id);
210
211 match connection {
212 Some(connection) => Some(connection.clone()),
213 None => None
214 }
215 }
216
217 pub async fn broadcast(&mut self, message: String) {
219 for connection in &mut self.connectors {
220 let message = message.clone();
221 let session = &mut connection.session;
222
223 let _ = session.text(message).await;
224 }
225 }
226
227 pub async fn broadcast_if<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool {
229 for connection in &mut self.connectors {
230 if condition(connection) {
231 let message = message.clone();
232 let session = &mut connection.session;
233 let _ = session.text(message).await;
234 }
235 }
236 }
237
238 pub async fn broadcast_if_not<F>(&mut self, message: String, condition: F) where F: Fn(&Connection) -> bool {
240 for connection in &mut self.connectors {
241 if !condition(connection) {
242 let message = message.clone();
243 let session = &mut connection.session;
244 let _ = session.text(message).await;
245 }
246 }
247 }
248
249 pub async fn ping(&mut self, bytes: Vec<u8>) {
251 for connection in &mut self.connectors {
252 let message = &bytes;
253 let session = &mut connection.session;
254
255 let _ = session.ping(message).await;
256 }
257 }
258
259 pub async fn ping_if<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
261 for connection in &mut self.connectors {
262 if condition(connection) {
263 let message = &bytes;
264 let session = &mut connection.session;
265 let _ = session.ping(message).await;
266 }
267 }
268 }
269
270 pub async fn ping_if_not<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
272 for connection in &mut self.connectors {
273 if !condition(connection) {
274 let message = &bytes;
275 let session = &mut connection.session;
276 let _ = session.ping(message).await;
277 }
278 }
279 }
280
281 pub async fn pong(&mut self, bytes: Vec<u8>) {
283 for connection in &mut self.connectors {
284 let message = &bytes;
285 let session = &mut connection.session;
286
287 let _ = session.pong(message).await;
288 }
289 }
290
291 pub async fn pong_if<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
293 for connection in &mut self.connectors {
294 if condition(connection) {
295 let message = &bytes;
296 let session = &mut connection.session;
297 let _ = session.pong(message).await;
298 }
299 }
300 }
301
302 pub async fn pong_if_not<F>(&mut self, bytes: Vec<u8>, condition: F) where F: Fn(&Connection) -> bool {
304 for connection in &mut self.connectors {
305 if !condition(connection) {
306 let message = &bytes;
307 let session = &mut connection.session;
308 let _ = session.pong(message).await;
309 }
310 }
311 }
312
313 pub async fn binary(&mut self, bytes: Bytes) {
315 for connection in &mut self.connectors {
316 let message = bytes.clone();
317 let session = &mut connection.session;
318
319 let _ = session.binary(message).await;
320 }
321 }
322
323 pub async fn binary_if<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool {
325 for connection in &mut self.connectors {
326 if condition(connection) {
327 let message = bytes.clone();
328 let session = &mut connection.session;
329 let _ = session.binary(message).await;
330 }
331 }
332 }
333
334 pub async fn binary_if_not<F>(&mut self, bytes: Bytes, condition: F) where F: Fn(&Connection) -> bool {
336 for connection in &mut self.connectors {
337 if !condition(connection) {
338 let message = bytes.clone();
339 let session = &mut connection.session;
340 let _ = session.binary(message).await;
341 }
342 }
343 }
344
345 pub async fn continuation(&mut self, item: Item) {
347 for connection in &mut self.connectors {
348 let session = &mut connection.session;
349
350 match item {
351 Item::FirstText(ref text) => {
352 let text = text;
353 session.continuation(Item::FirstText(text.clone())).await.unwrap()
354 },
355 Item::FirstBinary(ref binary) => {
356 let binary = binary;
357 session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
358 },
359 Item::Continue(ref cont_msg) => {
360 let cont_msg = cont_msg;
361 session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
362 },
363 Item::Last(ref last_msg) => {
364 let last_msg = last_msg;
365 session.continuation(Item::Last(last_msg.clone())).await.unwrap()
366 }
367 }
368 }
369 }
370
371 pub async fn continuation_if<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool {
373 for connection in &mut self.connectors {
374 if condition(connection) {
375 let session = &mut connection.session;
376
377 match item {
378 Item::FirstText(ref text) => {
379 let text = text;
380 session.continuation(Item::FirstText(text.clone())).await.unwrap()
381 },
382 Item::FirstBinary(ref binary) => {
383 let binary = binary;
384 session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
385 },
386 Item::Continue(ref cont_msg) => {
387 let cont_msg = cont_msg;
388 session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
389 },
390 Item::Last(ref last_msg) => {
391 let last_msg = last_msg;
392 session.continuation(Item::Last(last_msg.clone())).await.unwrap()
393 }
394 }
395 }
396 }
397 }
398
399 pub async fn continuation_if_not<F>(&mut self, item: Item, condition: F) where F: Fn(&Connection) -> bool {
401 for connection in &mut self.connectors {
402 if !condition(connection) {
403 let session = &mut connection.session;
404
405 match item {
406 Item::FirstText(ref text) => {
407 let text = text;
408 session.continuation(Item::FirstText(text.clone())).await.unwrap()
409 },
410 Item::FirstBinary(ref binary) => {
411 let binary = binary;
412 session.continuation(Item::FirstBinary(binary.clone())).await.unwrap()
413 },
414 Item::Continue(ref cont_msg) => {
415 let cont_msg = cont_msg;
416 session.continuation(Item::Continue(cont_msg.clone())).await.unwrap()
417 },
418 Item::Last(ref last_msg) => {
419 let last_msg = last_msg;
420 session.continuation(Item::Last(last_msg.clone())).await.unwrap()
421 }
422 }
423 }
424 }
425 }
426
427 pub async fn close_conn(&mut self, reason: Option<CloseReason>, id: &String) {
439 self.connectors.retain(|conn| {
440 if conn.id == *id {
441 let reason = reason.clone();
442
443 let _ = async {
444 let _ = conn.session.clone().close(reason).await;
445 };
446
447 false
448 } else {
449 true
450 }
451 });
452 }
453
454 pub async fn close(&mut self, reason: Option<CloseReason>) {
456 self.connectors.retain(|conn| {
457 let reason = reason.clone();
458
459 let _ = async {
460 let _ = conn.session.clone().close(reason).await;
461 };
462
463 false
464 });
465 }
466
467 pub async fn close_if<F>(&mut self, reason: Option<CloseReason>, condition: F) where F: Fn(&Connection) -> bool {
469 self.connectors.retain(|connection| {
470 if condition(connection) {
471 let reason = reason.clone();
472
473 let _ = async {
474 let _ = connection.session.clone().close(reason).await;
475 };
476
477 false
478 } else {
479 true
480 }
481 });
482 }
483
484 pub async fn close_if_not<F>(&mut self, reason: Option<CloseReason>, condition: F) where F: Fn(&Connection) -> bool {
486 self.connectors.retain(|connection| {
487 if !condition(connection) {
488 let reason = reason.clone();
489
490 let _ = async {
491 let _ = connection.session.clone().close(reason).await;
492 };
493
494 false
495 } else {
496 true
497 }
498 });
499 }
500}
501
502impl Broadcaster {
503 pub fn new() -> Arc<RwLock<Self>> {
505 Arc::new(RwLock::new(Self::default()))
506 }
507
508 pub fn handle(broadcaster: &Arc<RwLock<Self>>, room_id: &String, conn_id: &String, session: Session) -> Arc<RwLock<Self>> {
520 let mut broadcaster_write = broadcaster.write().unwrap();
521
522 broadcaster_write.handle_room(room_id).add_connection(conn_id, session);
523
524 Arc::clone(&broadcaster)
525 }
526
527 pub fn handle_room(&mut self, id: &String) -> &mut Room {
540 if let Some(index) = self.rooms.iter().position(|room| room.id == *id) {
541 return &mut self.rooms[index];
542 }
543
544 self.rooms.push(Room {
545 id: id.clone(),
546 connectors: vec![],
547 });
548
549 self.rooms.last_mut().unwrap()
550 }
551
552 pub fn room(&mut self, id: &String) -> &mut Room {
554 return self.rooms.iter_mut().find(|room| room.id == *id).unwrap();
555 }
556
557 pub fn check_room(&mut self, id: &String) -> Option<&mut Room> {
559 match self.rooms.iter_mut().find(|room| room.id == *id) {
560 Some(room) => Some(room),
561 None => None
562 }
563 }
564
565 pub fn check(&self, id: &String) -> bool {
567 return self.rooms.iter().any(|room| room.id == *id);
568 }
569
570 pub fn each_room_immut<F>(&self, f: F) where F: Fn(&Room) {
585 for room in &self.rooms {
586 f(room);
587 }
588 }
589
590 pub fn each_room<F>(&self, mut f: F) where F: FnMut(&Room) {
611 for room in &self.rooms {
612 f(room);
613 }
614 }
615
616 pub async fn each_room_mut<F>(&mut self, mut f: F) where F: FnMut(&mut Room) {
618 for room in &mut self.rooms {
619 f(room);
620 }
621 }
622
623 pub async fn remove_room(&mut self, id: String) {
643 self.rooms.retain(|room| {
644 if room.id == id {
645 let _ = async {
646 room.clone().close(None).await;
647 };
648
649 false
650 } else {
651 true
652 }
653 });
654 }
655
656 pub fn remove_empty_rooms(&mut self) {
658 self.rooms.retain(|room| {
659 if room.connectors.is_empty() {
660 false
661 } else {
662 true
663 }
664 });
665 }
666
667 pub fn remove_connection(&mut self, id: String) -> Option<Session> {
670 for room in &mut self.rooms {
671 if let Some(pos) = room.connectors.iter().position(|connection| connection.id == id) {
672 let connection = room.connectors.remove(pos);
673
674 return Some(connection.session);
675 }
676 }
677 None
678 }
679}
680
681impl Default for Broadcaster {
682 fn default() -> Self {
683 Self {
684 rooms: vec![],
685 }
686 }
687}