humphrey_ws 0.4.0

WebSocket support for the Humphrey web server.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
//! Provides asynchronous WebSocket functionality.

#![allow(clippy::new_without_default)]

use crate::handler::async_websocket_handler;
use crate::message::Message;
use crate::ping::Heartbeat;
use crate::restion::Restion;
use crate::stream::WebsocketStream;

use humphrey::thread::pool::ThreadPool;
use humphrey::App;

use std::collections::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{sleep, spawn};
use std::time::{Duration, Instant};

/// Represents an asynchronous WebSocket app.
pub struct AsyncWebsocketApp<State>
where
    State: Send + Sync + 'static,
{
    /// Represents the link to a Humphrey application.
    ///
    /// This may be:
    /// - `HumphreyLink::Internal`, in which case the app uses its own internal Humphrey application
    /// - `HumphreyLink::External`, in which case the app is linked to an external Humphrey application and receives connections through a channel
    ///
    /// Each enum variant has corresponding fields for the configuration.
    humphrey_link: HumphreyLink,
    /// Represents the state of the application.
    state: Arc<State>,
    /// The internal thread pool of the application.
    thread_pool: ThreadPool,
    /// The amount of time between polling.
    poll_interval: Option<Duration>,
    /// Ping configuration.
    heartbeat: Option<Heartbeat>,
    /// A hashmap with the addresses as the keys and the actual streams as the values.
    streams: HashMap<SocketAddr, WebsocketStream>,
    /// A receiver which is sent new streams to add to the hashmap.
    incoming_streams: Receiver<WebsocketStream>,
    /// A receiver which receives messages from handler threads to forward to clients.
    outgoing_messages: Receiver<OutgoingMessage>,
    /// A sender which is used by handler threads to send messages to clients.
    message_sender: Sender<OutgoingMessage>,
    /// The event handler called when a new client connects.
    on_connect: Option<Box<dyn EventHandler<State>>>,
    /// The event handler called when a client disconnects.
    on_disconnect: Option<Box<dyn EventHandler<State>>>,
    /// The event handler called when a client sends a message.
    on_message: Option<Box<dyn MessageHandler<State>>>,
}

/// Represents an asynchronous WebSocket stream.
///
/// This is what is passed to the handler in place of the actual stream. It is able to send
///   messages back to the stream using the sender and the stream is identified by its address.
pub struct AsyncStream {
    addr: SocketAddr,
    sender: Sender<OutgoingMessage>,
    connected: bool,
}

/// Represents a global sender which can send messages to clients without waiting for events.
pub struct AsyncSender(Sender<OutgoingMessage>);

/// Represents a message to be sent from the server to a client.
pub enum OutgoingMessage {
    /// A message to be sent to a specific client.
    Message(SocketAddr, Message),
    /// A message to be sent to every connected client.
    Broadcast(Message),
}

/// Represents the link to a Humphrey application.
///
/// This may be:
/// - `HumphreyLink::Internal`, in which case the app uses its own internal Humphrey application
/// - `HumphreyLink::External`, in which case the app is linked to an external Humphrey application and receives connections through a channel
///
/// Each enum variant has corresponding fields for the configuration.
pub enum HumphreyLink {
    /// The app uses its own internal Humphrey application.
    Internal(Box<App>, SocketAddr),
    /// The app is linked to an external Humphrey application and receives connections through a channel.
    External(Arc<Mutex<Sender<WebsocketStream>>>),
}

/// Represents a function able to handle a WebSocket event (a connection or disconnection).
/// It is passed the stream which triggered the event as well as the app's state.
///
/// ## Example
/// A basic example of an event handler would be as follows:
/// ```
/// fn connection_handler(stream: AsyncStream, state: Arc<()>) {
///     println!("A new client connected! {:?}", stream.addr);
///
///     stream.send(Message::new("Hello, World!"));
/// }
/// ```
pub trait EventHandler<S>: Fn(AsyncStream, Arc<S>) + Send + Sync + 'static {}
impl<T, S> EventHandler<S> for T where T: Fn(AsyncStream, Arc<S>) + Send + Sync + 'static {}

/// Represents a function able to handle a message event.
/// It is passed the stream which sent the message, the message and the app's state.
///
/// ## Example
/// A basic example of a message handler would be as follows:
/// ```
/// fn message_handler(stream: AsyncStream, message: Message, state: Arc<()>) {
///    println!("A message was received from {:?}: {}", stream.addr, message.text().unwrap());
///
///    stream.send(Message::new("Message received."));
/// }
/// ```
pub trait MessageHandler<S>: Fn(AsyncStream, Message, Arc<S>) + Send + Sync + 'static {}
impl<T, S> MessageHandler<S> for T where T: Fn(AsyncStream, Message, Arc<S>) + Send + Sync + 'static {}

impl<State> AsyncWebsocketApp<State>
where
    State: Send + Sync + 'static,
{
    /// Creates a new asynchronous WebSocket app.
    pub fn new() -> Self
    where
        State: Default,
    {
        let (connect_hook, incoming_streams) = channel();
        let connect_hook = Arc::new(Mutex::new(connect_hook));

        let (message_sender, outgoing_messages) = channel();

        let humphrey_app = App::new_with_config(1, ())
            .with_websocket_route("/*", async_websocket_handler(connect_hook));

        Self {
            humphrey_link: HumphreyLink::Internal(
                Box::new(humphrey_app),
                "0.0.0.0:80".to_socket_addrs().unwrap().next().unwrap(),
            ),
            state: Default::default(),
            poll_interval: Some(Duration::from_millis(10)),
            heartbeat: None,
            thread_pool: ThreadPool::new(32),
            streams: Default::default(),
            incoming_streams,
            outgoing_messages,
            message_sender,
            on_connect: None,
            on_disconnect: None,
            on_message: None,
        }
    }

    /// Creates a new asynchronous WebSocket app with a custom state and configuration.
    ///
    /// - `state`: The state of the application.
    /// - `handler_threads`: The size of the handler thread pool.
    /// - `connection_threads`: The size of the connection handler thread pool (the underlying Humphrey app).
    pub fn new_with_config(
        state: State,
        handler_threads: usize,
        connection_threads: usize,
    ) -> Self {
        let (connect_hook, incoming_streams) = channel();
        let connect_hook = Arc::new(Mutex::new(connect_hook));

        let (message_sender, outgoing_messages) = channel();

        let humphrey_app = App::new_with_config(connection_threads, ())
            .with_websocket_route("/*", async_websocket_handler(connect_hook));

        Self {
            humphrey_link: HumphreyLink::Internal(
                Box::new(humphrey_app),
                "0.0.0.0:80".to_socket_addrs().unwrap().next().unwrap(),
            ),
            state: Arc::new(state),
            poll_interval: Some(Duration::from_millis(10)),
            heartbeat: None,
            thread_pool: ThreadPool::new(handler_threads),
            streams: Default::default(),
            incoming_streams,
            outgoing_messages,
            message_sender,
            on_connect: None,
            on_disconnect: None,
            on_message: None,
        }
    }

    /// Creates a new asynchronous WebSocket app without creating a Humphrey application.
    ///
    /// This is useful if you want to use the app as part of a Humphrey application, or if you want to use TLS.
    ///
    /// You'll need to manually link the app to a Humphrey application using the `connect_hook`.
    pub fn new_unlinked() -> Self
    where
        State: Default,
    {
        let (connect_hook, incoming_streams) = channel();
        let connect_hook = Arc::new(Mutex::new(connect_hook));

        let (message_sender, outgoing_messages) = channel();

        Self {
            humphrey_link: HumphreyLink::External(connect_hook),
            state: Default::default(),
            poll_interval: Some(Duration::from_millis(10)),
            heartbeat: None,
            thread_pool: ThreadPool::new(32),
            streams: Default::default(),
            incoming_streams,
            outgoing_messages,
            message_sender,
            on_connect: None,
            on_disconnect: None,
            on_message: None,
        }
    }

    /// Creates a new asynchronous WebSocket app with a custom state and configuration, without creating a Humphrey application.
    ///
    /// This is useful if you want to use the app as part of a Humphrey application, or if you want to use TLS.
    ///
    /// You'll need to manually link the app to a Humphrey application using the `connect_hook`.
    ///
    /// - `state`: The state of the application.
    /// - `handler_threads`: The size of the handler thread pool.
    pub fn new_unlinked_with_config(state: State, handler_threads: usize) -> Self {
        let (connect_hook, incoming_streams) = channel();
        let connect_hook = Arc::new(Mutex::new(connect_hook));

        let (message_sender, outgoing_messages) = channel();

        Self {
            humphrey_link: HumphreyLink::External(connect_hook),
            state: Arc::new(state),
            poll_interval: Some(Duration::from_millis(10)),
            heartbeat: None,
            thread_pool: ThreadPool::new(handler_threads),
            streams: Default::default(),
            incoming_streams,
            outgoing_messages,
            message_sender,
            on_connect: None,
            on_disconnect: None,
            on_message: None,
        }
    }

    /// Returns a reference to the connection hook of the application.
    /// This is used by Humphrey Core to send new streams to the app.
    ///
    /// If the app is uses an internal Humphrey application, this will return `None`.
    pub fn connect_hook(&self) -> Option<Arc<Mutex<Sender<WebsocketStream>>>> {
        match &self.humphrey_link {
            HumphreyLink::External(connect_hook) => Some(connect_hook.clone()),
            _ => None,
        }
    }

    /// Returns a new `AsyncSender`, which can be used to send messages.
    pub fn sender(&self) -> AsyncSender {
        AsyncSender(self.message_sender.clone())
    }

    /// Gets a reference to the app’s state. This should only be used in the main thread, as the state is passed to event handlers otherwise.
    pub fn get_state(&self) -> Arc<State> {
        self.state.clone()
    }

    /// Set the event handler called when a new client connects.
    pub fn on_connect(&mut self, handler: impl EventHandler<State>) {
        self.on_connect = Some(Box::new(handler));
    }

    /// Set the event handler called when a client disconnects.
    pub fn on_disconnect(&mut self, handler: impl EventHandler<State>) {
        self.on_disconnect = Some(Box::new(handler));
    }

    /// Set the message handler called when a client sends a message.
    pub fn on_message(&mut self, handler: impl MessageHandler<State>) {
        self.on_message = Some(Box::new(handler));
    }

    /// Set the event handler called when a new client connects.
    /// Returns itself for use in a builder pattern.
    pub fn with_connect_handler(mut self, handler: impl EventHandler<State>) -> Self {
        self.on_connect(handler);
        self
    }

    /// Set the event handler called when a client disconnects.
    /// Returns itself for use in a builder pattern.
    pub fn with_disconnect_handler(mut self, handler: impl EventHandler<State>) -> Self {
        self.on_disconnect(handler);
        self
    }

    /// Set the message handler called when a client sends a message.
    /// Returns itself for use in a builder pattern.
    pub fn with_message_handler(mut self, handler: impl MessageHandler<State>) -> Self {
        self.on_message(handler);
        self
    }

    /// Set the address to run the application on.
    /// Returns itself for use in a builder pattern.
    ///
    /// This function has no effect if the app does not manage its own internal Humphrey application.
    pub fn with_address<T>(mut self, address: T) -> Self
    where
        T: ToSocketAddrs,
    {
        self.humphrey_link = match self.humphrey_link {
            HumphreyLink::Internal(app, _) => {
                let address = address.to_socket_addrs().unwrap().next().unwrap();
                HumphreyLink::Internal(app, address)
            }
            HumphreyLink::External(connect_hook) => HumphreyLink::External(connect_hook),
        };
        self
    }

    /// Sets the polling interval of the async app.
    ///
    /// By default, this is 10ms, meaning the app will check for new events 100 times a second.
    pub fn with_polling_interval(mut self, interval: Option<Duration>) -> Self {
        self.poll_interval = interval;
        self
    }

    /// Sets the heartbeat configuration for the async app.
    ///
    /// By default, this is off, meaning the app will not send heartbeats. If your application needs to detect
    ///   disconnections which occur suddenly, as in without sending a "close" frame, you should set this up.
    ///   It is particularly useful for detecting disconnections caused by network issues, which would not be ordinarily
    ///   detected by the client.
    pub fn with_heartbeat(mut self, heartbeat: Heartbeat) -> Self {
        self.heartbeat = Some(heartbeat);
        self
    }

    /// Start the application on the main thread.
    pub fn run(mut self) {
        // Ensure that the underlying Humphrey application is running if it is internal.
        if let HumphreyLink::Internal(app, addr) = self.humphrey_link {
            spawn(move || app.run(addr).unwrap());
        }

        self.thread_pool.start();

        let connect_handler = self.on_connect.map(Arc::new);
        let disconnect_handler = self.on_disconnect.map(Arc::new);
        let message_handler = self.on_message.map(Arc::new);

        let mut last_ping = Instant::now();

        loop {
            let keys: Vec<SocketAddr> = self.streams.keys().copied().collect();

            // Calculate whether a ping should be sent this iteration.
            let will_ping = self
                .heartbeat
                .as_ref()
                .map(|config| {
                    let will_ping = last_ping.elapsed() >= config.interval;

                    if will_ping {
                        last_ping = Instant::now();
                    }

                    will_ping
                })
                .unwrap_or(false);

            // Check for messages and status on each stream.
            for addr in keys {
                'inner: loop {
                    let stream = self.streams.get_mut(&addr).unwrap();

                    match stream.recv_nonblocking() {
                        Restion::Ok(message) => {
                            if let Some(handler) = &message_handler {
                                let async_stream =
                                    AsyncStream::new(addr, self.message_sender.clone());
                                let cloned_state = self.state.clone();
                                let cloned_handler = handler.clone();

                                self.thread_pool.execute(move || {
                                    (cloned_handler)(async_stream, message, cloned_state)
                                });
                            }
                        }
                        Restion::Err(_) => {
                            if let Some(handler) = &disconnect_handler {
                                let async_stream =
                                    AsyncStream::disconnected(addr, self.message_sender.clone());
                                let cloned_state = self.state.clone();
                                let cloned_handler = handler.clone();

                                self.thread_pool
                                    .execute(move || (cloned_handler)(async_stream, cloned_state));
                            }

                            self.streams.remove(&addr);
                            break 'inner;
                        }
                        Restion::None => break 'inner,
                    }
                }

                if let Some(stream) = self.streams.get_mut(&addr) {
                    // If the stream has timed out without sending a close frame, process it as a disconnection.
                    if let Some(ping) = &self.heartbeat {
                        if stream.last_pong.elapsed() >= ping.timeout {
                            if let Some(handler) = &disconnect_handler {
                                let async_stream =
                                    AsyncStream::disconnected(addr, self.message_sender.clone());
                                let cloned_state = self.state.clone();
                                let cloned_handler = handler.clone();

                                self.thread_pool
                                    .execute(move || (cloned_handler)(async_stream, cloned_state));
                            }

                            self.streams.remove(&addr);
                            continue;
                        }
                    }

                    // If a ping is due, send one.
                    if will_ping {
                        stream.ping().ok();
                    }
                }
            }

            // Add any streams awaiting connection.
            for (addr, stream) in self
                .incoming_streams
                .try_iter()
                .filter_map(|s| s.peer_addr().map(|a| (a, s)).ok())
            {
                if let Some(handler) = &connect_handler {
                    let async_stream = AsyncStream::new(addr, self.message_sender.clone());
                    let cloned_state = self.state.clone();
                    let cloned_handler = handler.clone();

                    self.thread_pool.execute(move || {
                        (cloned_handler)(async_stream, cloned_state);
                    });
                }

                self.streams.insert(addr, stream);
            }

            for message in self.outgoing_messages.try_iter() {
                match message {
                    OutgoingMessage::Message(addr, message) => {
                        if let Some(stream) = self.streams.get_mut(&addr) {
                            // Ignore errors with sending for now, and deal with them in the next iteration.
                            stream.send(message).ok();
                        }
                    }
                    OutgoingMessage::Broadcast(message) => {
                        let frame = message.to_frame();
                        for stream in self.streams.values_mut() {
                            // Ignore errors with sending for now, and deal with them in the next iteration.
                            stream.send_raw(&frame).ok();
                        }
                    }
                }
            }

            if let Some(interval) = self.poll_interval {
                sleep(interval);
            }
        }
    }
}

impl AsyncStream {
    /// Create a new asynchronous stream.
    pub fn new(addr: SocketAddr, sender: Sender<OutgoingMessage>) -> Self {
        Self {
            addr,
            sender,
            connected: true,
        }
    }

    /// Create a new disconnected asynchronous stream.
    /// This is used for getting the address of a disconnected stream.
    pub fn disconnected(addr: SocketAddr, sender: Sender<OutgoingMessage>) -> Self {
        Self {
            addr,
            sender,
            connected: false,
        }
    }

    /// Send a message to the client.
    pub fn send(&self, message: Message) {
        assert!(self.connected);
        self.sender
            .send(OutgoingMessage::Message(self.addr, message))
            .ok();
    }

    /// Broadcast a message to all connected clients.
    pub fn broadcast(&self, message: Message) {
        self.sender.send(OutgoingMessage::Broadcast(message)).ok();
    }

    /// Get the address of the stream.
    pub fn peer_addr(&self) -> SocketAddr {
        self.addr
    }
}

impl AsyncSender {
    /// Send a message to the client identified by the socket address.
    pub fn send(&self, address: SocketAddr, message: Message) {
        self.0.send(OutgoingMessage::Message(address, message)).ok();
    }

    /// Broadcast a message to all connected clients.
    pub fn broadcast(&self, message: Message) {
        self.0.send(OutgoingMessage::Broadcast(message)).ok();
    }
}