pushevent/
server.rs

1use crate::client::Client;
2use crate::Event;
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::sync::mpsc;
6use std::sync::{Arc, Mutex};
7use std::thread;
8use ws::{listen, Sender};
9
10pub(crate) type ServerRef = Arc<Mutex<RefCell<ServerInner>>>;
11
12/// The main server struct that gets returned when a ws server is opened.
13/// It encapsulates a vector of thread join handles, which holds mainly our Websocket server
14/// thread and a thread which receives messages from our mpsc channel. It also holds our inner
15/// ServerRef which you probably don't need to touch.
16pub struct Server {
17    threads: Vec<thread::JoinHandle<()>>,
18    inner: ServerRef,
19    tx: mpsc::Sender<Event>,
20}
21
22/// This is the inner server structs which holds a HashMap of all clients subscribed to a specific
23/// route. It is used internally to filter to whom we publish events.
24pub(crate) struct ServerInner {
25    clients: HashMap<String, Vec<Sender>>,
26}
27
28impl Server {
29    /// Returns a server instance. As soon as this method is called, a websocket server is opened
30    /// and a thread will start accepting events to be published.
31    /// # Arguments
32    ///
33    /// * `addr` - Static string slice which holds the address on which to open the websocket
34    /// server
35    ///
36    /// # Example
37    /// ```no_run
38    /// use pushevent::server::Server;
39    /// let server = Server::new("127.0.0.1:3012");
40    /// ```
41    pub fn new(addr: &'static str) -> Self {
42        let inner = Arc::new(Mutex::new(RefCell::new(ServerInner::new())));
43        let (tx, rx) = mpsc::channel::<Event>();
44        let mut threads = Vec::new();
45
46        let inner_clone = inner.clone();
47
48        // This thread handles our websocket server
49        threads.push(thread::spawn(move || {
50            listen(addr, |s| Client::new(inner_clone.clone(), s))
51                .expect("Failed to start ws server");
52        }));
53
54        let inner_clone = inner.clone();
55        // This thread handles our mpsc rx channel.
56        // It awaits for new messages to be pushed, it then locks the inner ServerInner mutex,
57        // serializes our event and broadcasts it to the specified resource.
58        // All events received over this channel are of Event type.
59        threads.push(thread::spawn(move || {
60            for event in rx.iter() {
61                inner_clone
62                    .lock()
63                    .unwrap()
64                    .borrow_mut()
65                    .broadcast(&event.get_res(), event.build());
66            }
67        }));
68
69        Self { threads, inner, tx }
70    }
71
72    /// Clones and returns a mpsc tx channel through which we can send events of [`Event`](Event)
73    /// type.
74    ///
75    /// # Example
76    /// ```compile_fail
77    /// use std::thread;
78    ///
79    /// let server = Server::new("127.0.0.1:3012");
80    ///
81    /// loop {
82    ///     let tx = server.get_tx();
83    ///     let _ = std::thread::spawn(move || {
84    ///         tx.send(...);
85    ///     });
86    /// }
87    /// ```
88    pub fn get_tx(&self) -> mpsc::Sender<Event> {
89        self.tx.clone()
90    }
91
92    /// This method cleans up all of our threads and should be called on exit of your main
93    /// application.
94    /// It drains all threads from self.threads and tries to join them.
95    ///
96    /// # Example
97    /// ```no_run
98    /// use pushevent::server::Server;
99    /// let mut server = Server::new("127.0.0.1:3012");
100    /// server.join_threads();
101    /// ```
102    pub fn join_threads(&mut self) {
103        for thread in self.threads.drain(0..) {
104            thread.join().unwrap();
105        }
106    }
107}
108
109impl ServerInner {
110    pub fn new() -> Self {
111        ServerInner {
112            clients: HashMap::new(),
113        }
114    }
115
116    /// Method used internally to add a client to the hashmap based on the route they have
117    /// connected to.
118    /// # Arguments
119    /// * `res` - Resource path to which the client has connected.
120    /// * `sender` - A Sender is a client that has connected to our server
121    ///
122    /// # Example
123    /// ```compile_fail
124    /// let inner = ServerInner::new();
125    /// let sender = Sender {...};
126    ///
127    /// inner.add_client("/hello", sender);
128    /// assert_eq!(inner.clients.len(), 1usize);
129    /// ```
130    pub fn add_client(&mut self, res: &str, sender: &Sender) {
131        match self.clients.get_mut(&res.to_owned()) {
132            Some(x) => x.push(sender.clone()),
133            None => {
134                let _ = self.clients.insert(res.to_owned(), vec![sender.clone()]);
135            }
136        }
137    }
138
139    /// Method used internally to removed clients that have disconnected from the global hashmap so
140    /// that events stop being published to them.
141    ///
142    /// # Arguments
143    /// * `sender` - A Sender is a client that has connected to our server
144    ///
145    /// # Example
146    /// ```compile_fail
147    /// let inner = ServerInner::new();
148    /// let sender = Sender {...};
149    ///
150    /// inner.add_client("/hello", sender);
151    /// assert_eq!(inner.clients.len(), 1usize);
152    ///
153    /// inner.remove_client(sender);
154    /// assert_eq!(inner.clients.len(), 0usize);
155    /// ```
156    pub fn remove_client(&mut self, sender: &Sender) {
157        for vec in self.clients.values_mut() {
158            vec.retain(|x| x.token() != sender.token())
159        }
160    }
161
162    /// Method used internally to broadcast messages to clients subscribed to a specific route. The
163    /// message will only be broadcast once to all connected clients.
164    ///
165    /// # Arguments
166    /// * `res` - String slice which holds the resource path we would like to publish events to.
167    /// * `msg` - String which holds the message we wish to publish.
168    ///
169    /// # Example
170    /// ```compile_fail
171    /// let inner = ServerInner::new();
172    /// inner.broadcast("/hello", "Hello World");
173    /// ```
174    pub fn broadcast(&self, res: &str, msg: String) {
175        let _ = self.clients.get(res).map(|x| {
176            for y in x {
177                y.send(msg.clone()).unwrap()
178            }
179        });
180    }
181}
182
183impl Default for ServerInner {
184    fn default() -> Self {
185        Self::new()
186    }
187}