ddf_simple_websockets/
lib.rs

1//! An easy-to-use WebSocket server.
2//!
3//! To start a WebSocket listener, simply call [`launch()`], and use the
4//! returned [`EventHub`] to react to client messages, connections, and disconnections.
5//!
6//! # Example
7//!
8//! A WebSocket echo server:
9//!
10//! ```no_run
11//! use simple_websockets::{Event, Responder};
12//! use std::collections::HashMap;
13//!
14//! fn main() {
15//!     // listen for WebSockets on port 8080:
16//!     let event_hub = simple_websockets::launch(8080)
17//!         .expect("failed to listen on port 8080");
18//!     // map between client ids and the client's `Responder`:
19//!     let mut clients: HashMap<u64, Responder> = HashMap::new();
20//!
21//!     loop {
22//!         match event_hub.poll_event() {
23//!             Event::Connect(client_id, responder) => {
24//!                 println!("A client connected with id #{}", client_id);
25//!                 // add their Responder to our `clients` map:
26//!                 clients.insert(client_id, responder);
27//!             },
28//!             Event::Disconnect(client_id) => {
29//!                 println!("Client #{} disconnected.", client_id);
30//!                 // remove the disconnected client from the clients map:
31//!                 clients.remove(&client_id);
32//!             },
33//!             Event::Message(client_id, message) => {
34//!                 println!("Received a message from client #{}: {:?}", client_id, message);
35//!                 // retrieve this client's `Responder`:
36//!                 let responder = clients.get(&client_id).unwrap();
37//!                 // echo the message back:
38//!                 responder.send(message);
39//!             },
40//!         }
41//!     }
42//! }
43//! ```
44use tokio::runtime::Runtime;
45use tokio::net::{TcpListener, TcpStream};
46use tokio_tungstenite::{tungstenite, accept_async};
47use futures_util::{StreamExt, SinkExt};
48
49#[derive(Debug)]
50pub enum Error {
51    /// Returned by [`launch`] if the websocket listener thread failed to start
52    FailedToStart,
53}
54
55/// An outgoing/incoming message to/from a websocket.
56#[derive(Debug)]
57pub enum Message {
58    /// A text message
59    Text(String),
60    /// A binary message
61    Binary(Vec<u8>),
62}
63
64impl Message {
65    fn into_tungstenite(self) -> tungstenite::Message {
66        match self {
67            Self::Text(text) => tungstenite::Message::Text(text),
68            Self::Binary(bytes) => tungstenite::Message::Binary(bytes),
69        }
70    }
71
72    fn from_tungstenite(message: tungstenite::Message) -> Option<Self> {
73        match message {
74            tungstenite::Message::Binary(bytes) => Some(Self::Binary(bytes)),
75            tungstenite::Message::Text(text) => Some(Self::Text(text)),
76            _ => None,
77        }
78    }
79}
80
81enum ResponderCommand {
82    Message(Message),
83    CloseConnection,
84}
85
86/// Sends outgoing messages to a websocket.
87/// Every connected websocket client has a corresponding `Responder`.
88///
89/// `Responder`s can be safely cloned and sent across threads, to be used in a
90/// multi-producer single-consumer paradigm.
91///
92/// If a Reponder is dropped while its client is still connected, the connection
93/// will be automatically closed. If there are multiple clones of a Responder,
94/// The client will not be disconnected until the last Responder is dropped.
95#[derive(Debug, Clone)]
96pub struct Responder {
97    tx: flume::Sender<ResponderCommand>,
98    client_id: u64,
99}
100
101impl Responder {
102    fn new(tx: flume::Sender<ResponderCommand>, client_id: u64) -> Self {
103        Self {
104            tx,
105            client_id,
106        }
107    }
108
109    /// Sends a message to the client represented by this `Responder`.
110    ///
111    /// Returns true if the message was sent, or false if it wasn't
112    /// sent (because the client is disconnected).
113    ///
114    /// Note that this *doesn't* need a mutable reference to `self`.
115    pub fn send(&self, message: Message) -> bool {
116        self.tx.send(ResponderCommand::Message(message))
117            .is_ok()
118    }
119
120    /// Closes this client's connection.
121    ///
122    /// Note that this *doesn't* need a mutable reference to `self`.
123    pub fn close(&self) {
124        let _ = self.tx.send(ResponderCommand::CloseConnection);
125    }
126
127    /// The id of the client that this `Responder` is connected to.
128    pub fn client_id(&self) -> u64 {
129        self.client_id
130    }
131}
132
133/// An incoming event from a client.
134/// This can be an incoming message, a new client connection, or a disconnection.
135#[derive(Debug)]
136pub enum Event {
137    /// A new client has connected.
138    Connect(
139        /// id of the client who connected
140        u64,
141        /// [`Responder`] used to send messages back to this client
142        Responder,
143    ),
144
145    /// A client has disconnected.
146    Disconnect(
147        /// id of the client who disconnected
148        u64,
149    ),
150
151    /// An incoming message from a client.
152    Message(
153        /// id of the client who sent the message
154        u64,
155        /// the message
156        Message,
157    ),
158}
159
160/// A queue of incoming events from clients.
161///
162/// The `EventHub` is the centerpiece of this library, and it is where all
163/// messages, connections, and disconnections are received.
164#[derive(Debug)]
165pub struct EventHub {
166    rx: flume::Receiver<Event>,
167}
168
169impl EventHub {
170    fn new(rx: flume::Receiver<Event>) -> Self {
171        Self {
172            rx,
173        }
174    }
175
176    /// Clears the event queue and returns all the events that were in the queue.
177    pub fn drain(&self) -> Vec<Event> {
178        if self.rx.is_disconnected() && self.rx.is_empty() {
179            panic!("EventHub channel disconnected. Panicking because Websocket listener thread was killed.");
180        }
181
182        self.rx.drain().collect()
183    }
184
185    /// Returns the next event, or None if the queue is empty.
186    pub fn next_event(&self) -> Option<Event> {
187        self.rx.try_recv().ok()
188    }
189
190    /// Returns the next event, blocking if the queue is empty.
191    pub fn poll_event(&self) -> Event {
192        self.rx.recv().unwrap()
193    }
194
195    /// Async version of [`poll_event`](Self::poll_event)
196    pub async fn poll_async(&self) -> Event {
197        self.rx.recv_async().await
198            .expect("Parent thread is dead")
199    }
200
201    /// Returns true if there are currently no events in the queue.
202    pub fn is_empty(&self) -> bool {
203        self.rx.is_empty()
204    }
205}
206
207/// Start listening for websocket connections on `port`.
208/// On success, returns an [`EventHub`] for receiving messages and
209/// connection/disconnection notifications.
210pub fn launch(port: u16) -> Result<EventHub, Error> {
211    let (tx, rx) = flume::unbounded();
212
213    std::thread::Builder::new()
214        .name("Websocket listener".to_string())
215        .spawn(move || {
216            start_runtime(tx, port).unwrap();
217        }).map_err(|_| Error::FailedToStart)?;
218
219    Ok(EventHub::new(rx))
220}
221
222fn start_runtime(event_tx: flume::Sender<Event>, port: u16) -> Result<(), Error> {
223    Runtime::new()
224        .map_err(|_| Error::FailedToStart)?
225        .block_on(async {
226            let address = format!("0.0.0.0:{}", port);
227            let listener = TcpListener::bind(&address).await
228                .map_err(|_| Error::FailedToStart)?;
229
230            let mut current_id: u64 = 0;
231            loop {
232                match listener.accept().await {
233                    Ok((stream, _)) => {
234                        tokio::spawn(handle_connection(stream, event_tx.clone(), current_id));
235                        current_id = current_id.wrapping_add(1);
236                    },
237                    _ => {},
238                }
239            }
240        })
241}
242
243async fn handle_connection(stream: TcpStream, event_tx: flume::Sender<Event>, id: u64) {
244    let ws_stream = match accept_async(stream).await {
245        Ok(s) => s,
246        Err(_) => return,
247    };
248
249    let (mut outgoing, mut incoming) = ws_stream.split();
250
251    // channel for the `Responder` to send things to this websocket
252    let (resp_tx, resp_rx) = flume::unbounded();
253
254    event_tx.send(Event::Connect(id, Responder::new(resp_tx, id)))
255        .expect("Parent thread is dead");
256
257    // future that waits for commands from the `Responder`
258    let responder_events = async move {
259        while let Ok(event) = resp_rx.recv_async().await {
260            match event {
261                ResponderCommand::Message(message) => {
262                    if let Err(_) = outgoing.send(message.into_tungstenite()).await {
263                        let _ = outgoing.close().await;
264                        return Ok(());
265                    }
266                },
267                ResponderCommand::CloseConnection => {
268                    let _ = outgoing.close().await;
269                    return Ok(());
270                },
271            }
272        }
273
274        // Disconnect if the `Responder` was dropped without explicitly disconnecting
275        let _ = outgoing.close().await;
276
277        // this future always returns Ok, so that it wont stop the try_join
278        Result::<(), ()>::Ok(())
279    };
280
281    let event_tx2 = event_tx.clone();
282    //future that forwards messages received from the websocket to the event channel
283    let events = async move {
284        while let Some(message) = incoming.next().await {
285            if let Ok(tungstenite_msg) = message {
286                if let Some(msg) = Message::from_tungstenite(tungstenite_msg) {
287                    event_tx2.send(Event::Message(id, msg))
288                        .expect("Parent thread is dead");
289                }
290            }
291        }
292
293        // stop the try_join once the websocket is closed and all pending incoming
294        // messages have been sent to the event channel.
295        // stopping the try_join causes responder_events to be closed too so that the
296        // `Receiver` cant send any more messages.
297        Result::<(), ()>::Err(())
298    };
299
300    // use try_join so that when `events` returns Err (the websocket closes), responder_events will be stopped too
301    let _ = futures_util::try_join!(responder_events, events);
302
303    event_tx.send(Event::Disconnect(id))
304        .expect("Parent thread is dead");
305}