Crate wstomp

Crate wstomp 

Source
Expand description

§wstomp

A STOMP-over-WebSocket client library for Rust, built on top of awc and async-stomp.

crates.io Documentation MIT or Apache 2.0 licensed Dependency Status CI downloads

This crate provides a simple client to connect to a STOMP-enabled WebSocket server (like RabbitMQ over Web-STOMP, or ActiveMQ). It handles the WebSocket connection, STOMP frame encoding/decoding, and WebSocket heartbeat (ping/pong) for you.

§Features

  • Connects to STOMP servers over WebSocket using awc.
  • Handles all STOMP protocol encoding and decoding via async-stomp.
  • Manages WebSocket ping/pong heartbeats automatically in a background task.
  • Provides a simple tokio::mpsc channel-based API (WStompClient) for sending and receiving STOMP frames.
  • Connection helpers for various authentication methods:
  • Optional rustls feature for SSL connections, with helpers that force HTTP/1.1 for compatibility with servers like SockJS.

§Installation

Add this to your Cargo.toml:

[dependencies]
wstomp = "0.1.0" # Replace with the actual version
actix-rt = "2.0"

For SSL support, enable the rustls feature:

[dependencies]
wstomp = { version = "0.1.0", features = ["rustls"] }

§Usage

Here is a basic example of connecting, subscribing to a topic, and receiving messages.

use wstomp::{
    connect_with_pass,
    stomp::{FromServer, Message, ToServer, client::Subscriber},
    WStompEvent, WStompError,
};

#[actix_rt::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = "ws://127.0.0.1:15674/ws/websocket"; // Example: RabbitMQ Web-STOMP (Note the "/websocket" suffix)
    let login = "guest".to_string();
    let passcode = "guest".to_string();

    // 1. Connect to the server
    println!("Connecting to {}...", url);
    let mut client = connect_with_pass(url, login, passcode)
        .await
        .expect("Failed to connect");

    println!("Connected! Subscribing...");

    // 2. Create a SUBSCRIBE frame
    let subscribe_frame = Subscriber::builder()
        .destination("queue.test")
        .id("subscription-1")
        .subscribe();

    // 3. Send the SUBSCRIBE frame
    client.send(subscribe_frame).await?;

    println!("Subscribed! Waiting for messages...");

    // 4. Listen for incoming messages
    while let Some(event) = client.recv().await {
        match event {
            // Receive messages from the server
            WStompEvent::Message(msg) => {
                match msg.content {
                    FromServer::Message {
                        destination,
                        message_id,
                        subscription,
                        body,
                        ..
                    } => {
                        println!("\n--- NEW MESSAGE ---");
                        println!("Destination: {}", destination);
                        println!("Subscription: {}", subscription);
                        println!("Message ID: {}", message_id);
                        if let Some(body_bytes) = body {
                            println!("Body: {}", String::from_utf8_lossy(&body_bytes));
                        }
                    }
                    FromServer::Receipt { receipt_id } => {
                        println!("Received receipt: {}", receipt_id);
                    }
                    FromServer::Connected { .. } => {
                        println!("Received CONNECTED frame (usually the first message)");
                    }
                    FromServer::Error { message, body, .. } => {
                        println!("Received ERROR frame: {}", message.unwrap_or_default());
                        if let Some(body_bytes) = body {
                            println!("Error Body: {}", String::from_utf8_lossy(&body_bytes));
                        }
                        break;
                    }
                    // IncompleteStompFrame is a warning, not a hard error
                    other => println!("Received other frame: {:?}", other),
                }
            }
            // Handle errors
            WStompEvent::Error(err) => {
                match err {
                    WStompError::IncompleteStompFrame => {
                        // This is a warning, you can choose to ignore it or log it
                        eprintln!("Warning: Dropped incomplete STOMP frame.");
                    }
                    other_err => {
                        // These are more serious errors
                        eprintln!("Connection error: {}", other_err);
                        break;
                    }
                }
            }
        }
    }

    Ok(())
}

§Connection with an Auth Token

If you need to pass an Authorization header (or any custom header):

use wstomp::connect_with_token;

#[actix_rt::main]
async fn main() {
    let url = "ws://my-server.com/ws/websocket";
    let token = "my-secret-jwt-token";

    let client = connect_with_token(url, token)
        .await
        .expect("Failed to connect");

    // ... use client
}

§Connection with SSL (rustls feature)

If you are connecting to a wss:// endpoint and need SSL, use the rustls feature and the connect_ssl_* helpers.

These helpers are specially configured to force HTTP/1.1, which can be necessary for compatibility with some WebSocket servers (like those using SockJS).

// Make sure to enable the "rustls" feature in Cargo.toml
use wstomp::connect_ssl_with_pass;

#[actix_rt::main]
async fn main() {
    let url = "wss://secure-server.com/ws";
    let login = "user".to_string();
    let passcode = "pass".to_string();

    let client = connect_ssl_with_pass(url, login, passcode)
        .await
        .expect("Failed to connect with SSL");

    println!("Connected securely!");
    // ... use client
}

§Auto-reconnect

Use WStompConfig::build_and_connect_with_reconnection_cb method to automatically perform a full reconnect upon errors.

use wstomp::{WStompClient, WStompConfig, WStompConnectError};

#[actix_rt::main]
async fn main() {
    let url = "wss://secure-server.com/ws";
    let session_token = "session_token";

    let cb = {
        move |wstomp_client_res: Result<WStompClient, WStompConnectError>| {
            async move {
                // Unwrap wstomp client here or react to an error.
                // Upon an error you can return from the callback to make wstomp library a re-connection attempt
            }
        }
    };

    let res = WStompConfig::new(url)
        .ssl()
        .auth_token(session_token)
        .build_and_connect_with_reconnection_cb(cb);

    // ... do different stuff here, but don't exit immediately as this will terminate wstomp loop.
}

§Error Handling

The connection functions (connect, connect_ssl, etc.) return a Result<WStompClient, WStompConnectError>.

Once connected, the WStompClient::rx channel produces WStompEvent items, it may be a message or WStompError.

  • WStompConnectError: An error that occurs during the initial WebSocket and STOMP CONNECT handshake.

  • WStompError: An error that occurs after a successful connection.

    • WsReceive / WsSend: A WebSocket protocol error.
    • StompDecoding / StompEncoding: A STOMP frame decoding/encoding error.
    • IncompleteStompFrame: A warning indicating that data was received but was not enough to form a complete STOMP frame. The client has dropped this data. This is often safe to ignore or log as a warning.
    • WebsocketClosed: WebSocket was closed, possibly a reason from awc library is inside.
    • PingFailed: Couldn’t send ping through the WebSocket protocol.
    • PingTimeout: There was no pong for last ping.

§License

This crate is licensed under “MIT” or “Apache-2.0”.

Re-exports§

pub use wstomp_event::WStompConnectError;
pub use wstomp_event::WStompError;
pub use wstomp_event::WStompEvent;

Modules§

stomp
wstomp_event

Structs§

WStompClient
Your client which reads websocket and produces STOMP messages. Also takes STOMP messages from you and sends it through websocket
WStompConfig

Traits§

StompConnect

Functions§

connect
Connect to STOMP server without additional parameters
connect_ssl
Connect to STOMP server through SSL
connect_ssl_with_pass
Connect to STOMP server through SSL using password.
connect_ssl_with_token
Connect to STOMP server through SSL using authorization token.
connect_with_pass
Connect to STOMP server using password
connect_with_token
Connect to STOMP server using authorization token

Type Aliases§

WStompReceiver
WStompSender