Crate reconnecting_websocket

source ·
Expand description

A wrapper around WebSocket that reconnects when the socket drops. Uses Backoff to determine the delay between reconnects

§Features

  • tracing - enables the tracing crate and logs everything it’s doing
  • state-events - changes the Item type of the stream to be an enum that is either a message or a status change Both are enabled by default

§Usage

Input means stuff you want to send from this client to the server

Outut means stuff you want to receive back from the server

  1. Implement TryFrom for Message for your input type
  2. Implement TryFrom<Message> for your output type
  3. Both input and output need to implement Unpin and, if using tracing feature, Debug
  4. Use SocketBuilder to set the URL and configure backoff. get_proto_and_host can help constructing the URL relative to the current window.location
  5. Call SocketBuilder::open to connect the socket. The errors open returns are likely fatal (invalid URL, blocked port), see WebSocket::open for details. The first connect is done in the builder so it fails fast if these fatal errors occur but the same kind of error can also occur on any reconnect and be returned by the Socket Stream implementation
  6. The returned Socket can then be polled to get incoming messages. Socket::send can be called to send messages or [Socket::get_sender] can be used to get an UnboundedSender. Socket::close or dropping it will drop the inner WebSocket which sends a close frame and cleans up the event handlers

§Example

tests/reconnect.rs

use cfg_if::cfg_if;
use futures::{select, FutureExt, StreamExt};
use gloo::timers::future::TimeoutFuture;
#[cfg(feature = "state-events")]
use reconnecting_websocket::Event;
use reconnecting_websocket::{Socket, SocketBuilder};

#[path = "./common.rs"]
mod common;

use common::{configure_tracing_once, Input, Output, ECHO_SERVER};
use tracing::{error, info};

#[cfg(all(test, target_arch = "wasm32"))]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[cfg(test)]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), allow(unused))]
async fn reconnect() {
    use reconnecting_websocket::Error;
    use tracing::trace;

    const SEND_COUNT: usize = 10;

    configure_tracing_once();

    async fn send_messages(socket: &mut Socket<Input, Output>, count: usize) {
        let mut outstanding_packets = Vec::new();

        for i in 0..count {
            outstanding_packets.push(i);
            socket.send(Input::Bar(i)).await.expect("send");
        }

        let mut timeout = TimeoutFuture::new(5000).fuse();

        loop {
            select! {
                r = socket.next() => {
                    let r = r.expect("next None");
                    fn handle_message(o: Result<Output, Error<Input, Output>>, outstanding_packets: &mut Vec<usize>) {
                        match o {
                            Ok(Output::Foo(n)) => {
                                trace!("Output::Foo({n})");
                                outstanding_packets.retain(|v| *v != n);
                            },
                            Err(e) => {
                                error!("next err: {e:?}");
                            }
                        }
                    }
                    cfg_if! {
                        if #[cfg(feature = "state-events")] {
                            match r {
                                Event::Message(m) => handle_message(m, &mut outstanding_packets),
                                Event::State(s) => info!("State changed: {s:?}"),
                            }
                        } else {
                            handle_message(r, &mut outstanding_packets);
                        }
                    }

                    if outstanding_packets.len() == 0 {
                        break;
                    }
                },

                _ = timeout => {
                    panic!("Timed out before receiving all responses (outstanding: {outstanding_packets:?}");
                },
            }
        }
    }

    let mut socket = SocketBuilder::<Input, Output>::new(ECHO_SERVER.to_string()).open().unwrap();

    info!("First test (before reconnect)");
    send_messages(&mut socket, SEND_COUNT).await;

    // Drop the socket
    socket.close_socket(None, Some("test close"));

    info!("Second test (after reconnect)");
    send_messages(&mut socket, SEND_COUNT).await;

    info!("All done");
}

Structs§

  • A wrapper around WebSocket that reconnects when the socket drops. Uses Backoff to determine the delay between reconnects
  • Builder for Socket Uses the DEFAULT_* consts for backoff and retry config
  • A handle that implements Sink for sending messages from the client to the server

Enums§

Constants§

Traits§

  • Trait expressing the requirements for a socket input type You don’t need to implement it directly, there is a blanked implementation for types that implement Unpin, Debug, <Message as TryFrom<Self>>
  • Trait expressing the requirements for a socket output type You don’t need to implement it directly, there is a blanked implementation for types that implement Unpin, Debug, <Self as TryFrom<Message>>

Functions§