ddf_simple_websockets/lib.rs
1//! An easy-to-use WebSocket server.
2//!
3//! To start a WebSocket listener, simply call [`launch()`], and use the
4//! returned [`EventHub`] to react to client messages, connections, and disconnections.
5//!
6//! # Example
7//!
8//! A WebSocket echo server:
9//!
10//! ```no_run
11//! use simple_websockets::{Event, Responder};
12//! use std::collections::HashMap;
13//!
14//! fn main() {
15//! // listen for WebSockets on port 8080:
16//! let event_hub = simple_websockets::launch(8080)
17//! .expect("failed to listen on port 8080");
18//! // map between client ids and the client's `Responder`:
19//! let mut clients: HashMap<u64, Responder> = HashMap::new();
20//!
21//! loop {
22//! match event_hub.poll_event() {
23//! Event::Connect(client_id, responder) => {
24//! println!("A client connected with id #{}", client_id);
25//! // add their Responder to our `clients` map:
26//! clients.insert(client_id, responder);
27//! },
28//! Event::Disconnect(client_id) => {
29//! println!("Client #{} disconnected.", client_id);
30//! // remove the disconnected client from the clients map:
31//! clients.remove(&client_id);
32//! },
33//! Event::Message(client_id, message) => {
34//! println!("Received a message from client #{}: {:?}", client_id, message);
35//! // retrieve this client's `Responder`:
36//! let responder = clients.get(&client_id).unwrap();
37//! // echo the message back:
38//! responder.send(message);
39//! },
40//! }
41//! }
42//! }
43//! ```
44use tokio::runtime::Runtime;
45use tokio::net::{TcpListener, TcpStream};
46use tokio_tungstenite::{tungstenite, accept_async};
47use futures_util::{StreamExt, SinkExt};
48
49#[derive(Debug)]
50pub enum Error {
51 /// Returned by [`launch`] if the websocket listener thread failed to start
52 FailedToStart,
53}
54
55/// An outgoing/incoming message to/from a websocket.
56#[derive(Debug)]
57pub enum Message {
58 /// A text message
59 Text(String),
60 /// A binary message
61 Binary(Vec<u8>),
62}
63
64impl Message {
65 fn into_tungstenite(self) -> tungstenite::Message {
66 match self {
67 Self::Text(text) => tungstenite::Message::Text(text),
68 Self::Binary(bytes) => tungstenite::Message::Binary(bytes),
69 }
70 }
71
72 fn from_tungstenite(message: tungstenite::Message) -> Option<Self> {
73 match message {
74 tungstenite::Message::Binary(bytes) => Some(Self::Binary(bytes)),
75 tungstenite::Message::Text(text) => Some(Self::Text(text)),
76 _ => None,
77 }
78 }
79}
80
81enum ResponderCommand {
82 Message(Message),
83 CloseConnection,
84}
85
86/// Sends outgoing messages to a websocket.
87/// Every connected websocket client has a corresponding `Responder`.
88///
89/// `Responder`s can be safely cloned and sent across threads, to be used in a
90/// multi-producer single-consumer paradigm.
91///
92/// If a Reponder is dropped while its client is still connected, the connection
93/// will be automatically closed. If there are multiple clones of a Responder,
94/// The client will not be disconnected until the last Responder is dropped.
95#[derive(Debug, Clone)]
96pub struct Responder {
97 tx: flume::Sender<ResponderCommand>,
98 client_id: u64,
99}
100
101impl Responder {
102 fn new(tx: flume::Sender<ResponderCommand>, client_id: u64) -> Self {
103 Self {
104 tx,
105 client_id,
106 }
107 }
108
109 /// Sends a message to the client represented by this `Responder`.
110 ///
111 /// Returns true if the message was sent, or false if it wasn't
112 /// sent (because the client is disconnected).
113 ///
114 /// Note that this *doesn't* need a mutable reference to `self`.
115 pub fn send(&self, message: Message) -> bool {
116 self.tx.send(ResponderCommand::Message(message))
117 .is_ok()
118 }
119
120 /// Closes this client's connection.
121 ///
122 /// Note that this *doesn't* need a mutable reference to `self`.
123 pub fn close(&self) {
124 let _ = self.tx.send(ResponderCommand::CloseConnection);
125 }
126
127 /// The id of the client that this `Responder` is connected to.
128 pub fn client_id(&self) -> u64 {
129 self.client_id
130 }
131}
132
133/// An incoming event from a client.
134/// This can be an incoming message, a new client connection, or a disconnection.
135#[derive(Debug)]
136pub enum Event {
137 /// A new client has connected.
138 Connect(
139 /// id of the client who connected
140 u64,
141 /// [`Responder`] used to send messages back to this client
142 Responder,
143 ),
144
145 /// A client has disconnected.
146 Disconnect(
147 /// id of the client who disconnected
148 u64,
149 ),
150
151 /// An incoming message from a client.
152 Message(
153 /// id of the client who sent the message
154 u64,
155 /// the message
156 Message,
157 ),
158}
159
160/// A queue of incoming events from clients.
161///
162/// The `EventHub` is the centerpiece of this library, and it is where all
163/// messages, connections, and disconnections are received.
164#[derive(Debug)]
165pub struct EventHub {
166 rx: flume::Receiver<Event>,
167}
168
169impl EventHub {
170 fn new(rx: flume::Receiver<Event>) -> Self {
171 Self {
172 rx,
173 }
174 }
175
176 /// Clears the event queue and returns all the events that were in the queue.
177 pub fn drain(&self) -> Vec<Event> {
178 if self.rx.is_disconnected() && self.rx.is_empty() {
179 panic!("EventHub channel disconnected. Panicking because Websocket listener thread was killed.");
180 }
181
182 self.rx.drain().collect()
183 }
184
185 /// Returns the next event, or None if the queue is empty.
186 pub fn next_event(&self) -> Option<Event> {
187 self.rx.try_recv().ok()
188 }
189
190 /// Returns the next event, blocking if the queue is empty.
191 pub fn poll_event(&self) -> Event {
192 self.rx.recv().unwrap()
193 }
194
195 /// Async version of [`poll_event`](Self::poll_event)
196 pub async fn poll_async(&self) -> Event {
197 self.rx.recv_async().await
198 .expect("Parent thread is dead")
199 }
200
201 /// Returns true if there are currently no events in the queue.
202 pub fn is_empty(&self) -> bool {
203 self.rx.is_empty()
204 }
205}
206
207/// Start listening for websocket connections on `port`.
208/// On success, returns an [`EventHub`] for receiving messages and
209/// connection/disconnection notifications.
210pub fn launch(port: u16) -> Result<EventHub, Error> {
211 let (tx, rx) = flume::unbounded();
212
213 std::thread::Builder::new()
214 .name("Websocket listener".to_string())
215 .spawn(move || {
216 start_runtime(tx, port).unwrap();
217 }).map_err(|_| Error::FailedToStart)?;
218
219 Ok(EventHub::new(rx))
220}
221
222fn start_runtime(event_tx: flume::Sender<Event>, port: u16) -> Result<(), Error> {
223 Runtime::new()
224 .map_err(|_| Error::FailedToStart)?
225 .block_on(async {
226 let address = format!("0.0.0.0:{}", port);
227 let listener = TcpListener::bind(&address).await
228 .map_err(|_| Error::FailedToStart)?;
229
230 let mut current_id: u64 = 0;
231 loop {
232 match listener.accept().await {
233 Ok((stream, _)) => {
234 tokio::spawn(handle_connection(stream, event_tx.clone(), current_id));
235 current_id = current_id.wrapping_add(1);
236 },
237 _ => {},
238 }
239 }
240 })
241}
242
243async fn handle_connection(stream: TcpStream, event_tx: flume::Sender<Event>, id: u64) {
244 let ws_stream = match accept_async(stream).await {
245 Ok(s) => s,
246 Err(_) => return,
247 };
248
249 let (mut outgoing, mut incoming) = ws_stream.split();
250
251 // channel for the `Responder` to send things to this websocket
252 let (resp_tx, resp_rx) = flume::unbounded();
253
254 event_tx.send(Event::Connect(id, Responder::new(resp_tx, id)))
255 .expect("Parent thread is dead");
256
257 // future that waits for commands from the `Responder`
258 let responder_events = async move {
259 while let Ok(event) = resp_rx.recv_async().await {
260 match event {
261 ResponderCommand::Message(message) => {
262 if let Err(_) = outgoing.send(message.into_tungstenite()).await {
263 let _ = outgoing.close().await;
264 return Ok(());
265 }
266 },
267 ResponderCommand::CloseConnection => {
268 let _ = outgoing.close().await;
269 return Ok(());
270 },
271 }
272 }
273
274 // Disconnect if the `Responder` was dropped without explicitly disconnecting
275 let _ = outgoing.close().await;
276
277 // this future always returns Ok, so that it wont stop the try_join
278 Result::<(), ()>::Ok(())
279 };
280
281 let event_tx2 = event_tx.clone();
282 //future that forwards messages received from the websocket to the event channel
283 let events = async move {
284 while let Some(message) = incoming.next().await {
285 if let Ok(tungstenite_msg) = message {
286 if let Some(msg) = Message::from_tungstenite(tungstenite_msg) {
287 event_tx2.send(Event::Message(id, msg))
288 .expect("Parent thread is dead");
289 }
290 }
291 }
292
293 // stop the try_join once the websocket is closed and all pending incoming
294 // messages have been sent to the event channel.
295 // stopping the try_join causes responder_events to be closed too so that the
296 // `Receiver` cant send any more messages.
297 Result::<(), ()>::Err(())
298 };
299
300 // use try_join so that when `events` returns Err (the websocket closes), responder_events will be stopped too
301 let _ = futures_util::try_join!(responder_events, events);
302
303 event_tx.send(Event::Disconnect(id))
304 .expect("Parent thread is dead");
305}