websocket-web 0.1.9

WebSockets on the web 🕸️ — WebSocket support in a JavaScript runtime environment, usually a web browser.
Documentation
use futures_util::{SinkExt, StreamExt};
use tokio::{
    io::{duplex, AsyncReadExt, AsyncWriteExt},
    sync::{mpsc, oneshot},
};
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::spawn_local;
use wasm_bindgen_test::wasm_bindgen_test;

use websocket_web::*;

mod util;
use util::ResultExt;

fn url() -> String {
    let host = web_sys::window().unwrap().location().hostname().unwrap();
    format!("ws://{host}:8765")
}

async fn echo(interface: Option<Interface>) {
    let url = url();
    let mut builder = WebSocketBuilder::new(&url);
    if let Some(interface) = interface {
        builder.set_interface(interface);
    }

    log!("Connecting to {url} using {interface:?}");
    let mut socket = builder.connect().await.expect_log("connect failed");
    log!("Connected: {socket:?}");

    let msg = "hi123";
    log!("Sending text message: {msg}");
    socket.send(msg).await.unwrap_log();

    log!("Receiving message");
    let recved = socket.next().await.unwrap_log().unwrap_log();
    log!("Received: {recved:?}");
    assert_eq!(recved, Msg::Text(msg.to_string()));

    let msg2 = [1, 2, 3, 11, 12, 13];
    log!("Sending binary message: {msg2:?}");
    socket.send(&msg2[..]).await.unwrap_log();

    log!("Receiving message");
    let recved2 = socket.next().await.unwrap_log().unwrap_log();
    log!("Received: {recved2:?}");
    assert_eq!(recved2, Msg::Binary(msg2.to_vec()));

    socket.close_with_reason(CloseCode::NormalClosure, "goodbye");
}

#[wasm_bindgen_test]
async fn echo_auto() {
    echo(None).await;
}

#[wasm_bindgen_test]
async fn echo_stream() {
    if !Interface::Stream.is_supported() {
        log!("WebSocketStream not supported");
        return;
    }
    echo(Some(Interface::Stream)).await;
}

#[wasm_bindgen_test]
async fn echo_standard() {
    echo(Some(Interface::Standard)).await;
}

async fn backpressure(interface: Option<Interface>) {
    const CNT: usize = 100_000;
    const CLOSE_MSG: &str = "CLOSE-123";

    let url = url();
    let mut builder = WebSocketBuilder::new(&url);
    if let Some(interface) = interface {
        builder.set_interface(interface);
    }

    log!("Connecting to {url} using {interface:?}");
    let socket = builder.connect().await.expect_log("connect failed");
    log!("Connected: {socket:?}");

    let closed = socket.closed();
    let (mut tx, mut rx) = socket.into_split();
    let (local_tx, mut local_rx) = mpsc::unbounded_channel();
    let (total_tx, total_rx) = oneshot::channel();

    spawn_local(async move {
        let mut total = 0;
        let mut rxed = 0;
        while let Some(msg) = rx.next().await {
            let msg = msg.expect_log("receive failed");
            let local_msg = local_rx.recv().await.unwrap_log();
            if msg != local_msg {
                panic_log!("recevied wrong message");
            }

            total += msg.len();
            rxed += 1;

            if rxed % 1000 == 0 {
                log!("Received message {rxed} / {CNT}");
            }
        }
        log!("Received {rxed} / {CNT} messages of total size {total} bytes");
        let None = local_rx.recv().await else { panic_log!("message missing") };

        log!("Waiting for close info");
        let reason = closed.await;
        log!("Close reason: {reason}");
        if reason.code != CloseCode::Other(3999) {
            panic_log!("Invalid close code {reason}");
        }
        if reason.reason != CLOSE_MSG {
            panic_log!("Invalid close reason: {reason}");
        }

        total_tx.send(total).unwrap_log();
    });

    for i in 1..=CNT {
        let len = i % 10_000;
        if i % 1000 == 0 {
            log!("Sending message {i} / {CNT} of size {len} bytes");
        }

        let data = vec![i as u8; len];
        tx.feed(data.clone()).await.expect_log("feed failed");
        local_tx.send(Msg::Binary(data)).unwrap_log();
    }
    log!("Flushing");
    <WebSocketSender as SinkExt<Vec<u8>>>::flush(&mut tx).await.expect_log("flush failed");

    log!("Sending {CLOSE_MSG}");
    tx.send(CLOSE_MSG).await.expect_log("send CLOSE failed");

    log!("Waiting for closure");
    drop(local_tx);
    let total = total_rx.await.expect_log("not okay");
    log!("Received {total} bytes");
}

#[wasm_bindgen_test]
async fn backpressure_stream() {
    if !Interface::Stream.is_supported() {
        log!("WebSocketStream not supported");
        return;
    }
    backpressure(Some(Interface::Stream)).await;
}

#[wasm_bindgen_test]
async fn backpressure_standard() {
    backpressure(Some(Interface::Standard)).await;
}

async fn io(interface: Option<Interface>) {
    const CNT: usize = 100_000;
    const CLOSE_MSG: &str = "CLOSE-123";

    let url = url();
    let mut builder = WebSocketBuilder::new(&url);
    if let Some(interface) = interface {
        builder.set_interface(interface);
    }

    log!("Connecting to {url} using {interface:?}");
    let socket = builder.connect().await.expect_log("connect failed");
    log!("Connected: {socket:?}");

    let (mut tx, mut rx) = socket.into_split();

    let (mut local_tx, mut local_rx) = duplex(100_000);
    let (total_tx, total_rx) = oneshot::channel();

    spawn_local(async move {
        let mut total = 0;
        loop {
            let mut buf = vec![0; 1024];
            let n = rx.read(&mut buf).await.expect_log("read failed");
            buf.truncate(n);
            if n == 0 {
                break;
            }

            let mut local_buf = vec![0; buf.len()];
            local_rx.read_exact(&mut local_buf).await.expect_log("local data ended");

            if buf != local_buf {
                panic_log!("recevied wrong message");
            }

            total += buf.len();
        }
        total_tx.send(total).unwrap_log();
    });

    let mut sent = 0;
    for i in 1..=CNT {
        let len = i % 10_000;
        if i % 1000 == 0 {
            log!("Sending message {i} / {CNT} of size {len} bytes");
        }

        let data = vec![i as u8; len];
        tx.write_all(&data).await.expect_log("write_all failed");
        local_tx.write_all(&data).await.unwrap_log();
        sent += data.len();
    }
    log!("Flushing");
    <WebSocketSender as AsyncWriteExt>::flush(&mut tx).await.expect_log("flush failed");
    local_tx.flush().await.unwrap_log();

    log!("Sending {CLOSE_MSG}");
    tx.send(CLOSE_MSG).await.expect_log("send CLOSE failed");

    log!("Waiting for closure");
    drop(local_tx);
    let total = total_rx.await.expect_log("not okay");
    log!("Received {total} bytes");
    assert_eq!(total, sent);
}

#[wasm_bindgen_test]
async fn io_stream() {
    if !Interface::Stream.is_supported() {
        msg!("WebSocketStream not supported");
        return;
    }
    io(Some(Interface::Stream)).await;
}

#[wasm_bindgen_test]
async fn io_standard() {
    io(Some(Interface::Standard)).await;
}