pushevent/server.rs
1use crate::client::Client;
2use crate::Event;
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::sync::mpsc;
6use std::sync::{Arc, Mutex};
7use std::thread;
8use ws::{listen, Sender};
9
10pub(crate) type ServerRef = Arc<Mutex<RefCell<ServerInner>>>;
11
12/// The main server struct that gets returned when a ws server is opened.
13/// It encapsulates a vector of thread join handles, which holds mainly our Websocket server
14/// thread and a thread which receives messages from our mpsc channel. It also holds our inner
15/// ServerRef which you probably don't need to touch.
16pub struct Server {
17 threads: Vec<thread::JoinHandle<()>>,
18 inner: ServerRef,
19 tx: mpsc::Sender<Event>,
20}
21
22/// This is the inner server structs which holds a HashMap of all clients subscribed to a specific
23/// route. It is used internally to filter to whom we publish events.
24pub(crate) struct ServerInner {
25 clients: HashMap<String, Vec<Sender>>,
26}
27
28impl Server {
29 /// Returns a server instance. As soon as this method is called, a websocket server is opened
30 /// and a thread will start accepting events to be published.
31 /// # Arguments
32 ///
33 /// * `addr` - Static string slice which holds the address on which to open the websocket
34 /// server
35 ///
36 /// # Example
37 /// ```no_run
38 /// use pushevent::server::Server;
39 /// let server = Server::new("127.0.0.1:3012");
40 /// ```
41 pub fn new(addr: &'static str) -> Self {
42 let inner = Arc::new(Mutex::new(RefCell::new(ServerInner::new())));
43 let (tx, rx) = mpsc::channel::<Event>();
44 let mut threads = Vec::new();
45
46 let inner_clone = inner.clone();
47
48 // This thread handles our websocket server
49 threads.push(thread::spawn(move || {
50 listen(addr, |s| Client::new(inner_clone.clone(), s))
51 .expect("Failed to start ws server");
52 }));
53
54 let inner_clone = inner.clone();
55 // This thread handles our mpsc rx channel.
56 // It awaits for new messages to be pushed, it then locks the inner ServerInner mutex,
57 // serializes our event and broadcasts it to the specified resource.
58 // All events received over this channel are of Event type.
59 threads.push(thread::spawn(move || {
60 for event in rx.iter() {
61 inner_clone
62 .lock()
63 .unwrap()
64 .borrow_mut()
65 .broadcast(&event.get_res(), event.build());
66 }
67 }));
68
69 Self { threads, inner, tx }
70 }
71
72 /// Clones and returns a mpsc tx channel through which we can send events of [`Event`](Event)
73 /// type.
74 ///
75 /// # Example
76 /// ```compile_fail
77 /// use std::thread;
78 ///
79 /// let server = Server::new("127.0.0.1:3012");
80 ///
81 /// loop {
82 /// let tx = server.get_tx();
83 /// let _ = std::thread::spawn(move || {
84 /// tx.send(...);
85 /// });
86 /// }
87 /// ```
88 pub fn get_tx(&self) -> mpsc::Sender<Event> {
89 self.tx.clone()
90 }
91
92 /// This method cleans up all of our threads and should be called on exit of your main
93 /// application.
94 /// It drains all threads from self.threads and tries to join them.
95 ///
96 /// # Example
97 /// ```no_run
98 /// use pushevent::server::Server;
99 /// let mut server = Server::new("127.0.0.1:3012");
100 /// server.join_threads();
101 /// ```
102 pub fn join_threads(&mut self) {
103 for thread in self.threads.drain(0..) {
104 thread.join().unwrap();
105 }
106 }
107}
108
109impl ServerInner {
110 pub fn new() -> Self {
111 ServerInner {
112 clients: HashMap::new(),
113 }
114 }
115
116 /// Method used internally to add a client to the hashmap based on the route they have
117 /// connected to.
118 /// # Arguments
119 /// * `res` - Resource path to which the client has connected.
120 /// * `sender` - A Sender is a client that has connected to our server
121 ///
122 /// # Example
123 /// ```compile_fail
124 /// let inner = ServerInner::new();
125 /// let sender = Sender {...};
126 ///
127 /// inner.add_client("/hello", sender);
128 /// assert_eq!(inner.clients.len(), 1usize);
129 /// ```
130 pub fn add_client(&mut self, res: &str, sender: &Sender) {
131 match self.clients.get_mut(&res.to_owned()) {
132 Some(x) => x.push(sender.clone()),
133 None => {
134 let _ = self.clients.insert(res.to_owned(), vec![sender.clone()]);
135 }
136 }
137 }
138
139 /// Method used internally to removed clients that have disconnected from the global hashmap so
140 /// that events stop being published to them.
141 ///
142 /// # Arguments
143 /// * `sender` - A Sender is a client that has connected to our server
144 ///
145 /// # Example
146 /// ```compile_fail
147 /// let inner = ServerInner::new();
148 /// let sender = Sender {...};
149 ///
150 /// inner.add_client("/hello", sender);
151 /// assert_eq!(inner.clients.len(), 1usize);
152 ///
153 /// inner.remove_client(sender);
154 /// assert_eq!(inner.clients.len(), 0usize);
155 /// ```
156 pub fn remove_client(&mut self, sender: &Sender) {
157 for vec in self.clients.values_mut() {
158 vec.retain(|x| x.token() != sender.token())
159 }
160 }
161
162 /// Method used internally to broadcast messages to clients subscribed to a specific route. The
163 /// message will only be broadcast once to all connected clients.
164 ///
165 /// # Arguments
166 /// * `res` - String slice which holds the resource path we would like to publish events to.
167 /// * `msg` - String which holds the message we wish to publish.
168 ///
169 /// # Example
170 /// ```compile_fail
171 /// let inner = ServerInner::new();
172 /// inner.broadcast("/hello", "Hello World");
173 /// ```
174 pub fn broadcast(&self, res: &str, msg: String) {
175 let _ = self.clients.get(res).map(|x| {
176 for y in x {
177 y.send(msg.clone()).unwrap()
178 }
179 });
180 }
181}
182
183impl Default for ServerInner {
184 fn default() -> Self {
185 Self::new()
186 }
187}