async-stomp

An async STOMP library for Rust, using the Tokio stack.
This is a fork of tokio-stomp-2, with the purpose of getting some basic maintenance going.
Overview
This library provides a fully asynchronous Rust implementation of the STOMP (Simple/Streaming Text Oriented Messaging Protocol) 1.2 protocol. It allows Rust applications to communicate with message brokers like ActiveMQ, RabbitMQ, and others that support the STOMP protocol.
The library is built on top of Tokio, leveraging its async I/O capabilities to provide non-blocking message handling.
Features
- Async STOMP client for Rust using Tokio
- Support for all STOMP operations:
- Connection management (connect, disconnect)
- Message publishing
- Subscriptions
- Acknowledgments (auto, client, client-individual modes)
- Transactions
- TLS/SSL support for secure connections
- Custom headers support for advanced configurations
- Built-in heartbeat support
- Error handling with detailed error information
Installation
Add this to your Cargo.toml:
[dependencies]
async-stomp = "0.6"
Usage Guide
Connecting to a STOMP Server
The primary entry point is the Connector builder, which allows you to configure and establish a connection:
use async_stomp::client::Connector;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let connection = Connector::builder()
.server("localhost:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;
let connection_with_headers = Connector::builder()
.server("localhost:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.headers(vec![("client-id".to_string(), "my-client".to_string())])
.connect()
.await?;
Ok(())
}
Sending a message to a queue
use futures::prelude::*;
use async_stomp::client::Connector;
use async_stomp::ToServer;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;
conn.send(
ToServer::Send {
destination: "queue.test".into(),
transaction: None,
headers: None,
body: Some(b"Hello there rustaceans!".to_vec()),
}
.into(),
)
.await?;
conn.send(
ToServer::Send {
destination: "queue.test".into(),
transaction: None,
headers: Some(vec![
("content-type".to_string(), "text/plain".to_string()),
("priority".to_string(), "high".to_string())
]),
body: Some(b"Important message!".to_vec()),
}
.into(),
)
.await?;
Ok(())
}
Subscribing to a queue and receiving messages
use futures::prelude::*;
use async_stomp::client::Connector;
use async_stomp::client::Subscriber;
use async_stomp::FromServer;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;
let subscribe = Subscriber::builder()
.destination("queue.test")
.id("subscription-1")
.subscribe();
conn.send(subscribe).await?;
while let Some(message) = conn.next().await {
match message {
Ok(msg) => {
if let FromServer::Message {
destination,
message_id,
body,
..
} = msg.content {
println!("Received message from {}", destination);
println!("Message ID: {}", message_id);
if let Some(body) = body {
println!("Body: {}", String::from_utf8_lossy(&body));
}
}
},
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
break;
}
}
}
Ok(())
}
Using transactions
Transactions allow you to group multiple operations together and commit or abort them as a unit:
use futures::prelude::*;
use async_stomp::{client::Connector, ToServer};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let mut conn = Connector::builder()
.server("127.0.0.1:61613")
.virtualhost("/")
.login("guest".to_string())
.passcode("guest".to_string())
.connect()
.await?;
let transaction_id = "tx-1";
conn.send(ToServer::Begin {
transaction: transaction_id.to_string()
}.into()).await?;
conn.send(
ToServer::Send {
destination: "queue.test".into(),
transaction: Some(transaction_id.to_string()),
headers: None,
body: Some(b"Message 1 in transaction".to_vec()),
}
.into(),
).await?;
conn.send(
ToServer::Send {
destination: "queue.test".into(),
transaction: Some(transaction_id.to_string()),
headers: None,
body: Some(b"Message 2 in transaction".to_vec()),
}
.into(),
).await?;
conn.send(ToServer::Commit {
transaction: transaction_id.to_string()
}.into()).await?;
let transaction_id = "tx-2";
conn.send(ToServer::Begin {
transaction: transaction_id.to_string()
}.into()).await?;
conn.send(
ToServer::Send {
destination: "queue.test".into(),
transaction: Some(transaction_id.to_string()),
headers: None,
body: Some(b"This message will be aborted".to_vec()),
}
.into(),
).await?;
conn.send(ToServer::Abort {
transaction: transaction_id.to_string()
}.into()).await?;
Ok(())
}
Secure Connection with TLS/SSL
use futures::prelude::*;
use async_stomp::client::Connector;
use async_stomp::client::Subscriber;
use async_stomp::ToServer;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let server_address = "secure-stomp-server.example.com:61614";
let mut conn = Connector::builder()
.server(server_address)
.virtualhost("secure-stomp-server.example.com")
.login("guest".to_string())
.passcode("guest".to_string())
.use_tls(true) .tls_server_name("secure-stomp-server.example.com") .connect()
.await?;
let subscribe = Subscriber::builder()
.destination("secure.topic")
.id("secure-subscription")
.subscribe();
conn.send(subscribe).await?;
conn.send(
ToServer::Send {
destination: "secure.topic".into(),
transaction: None,
headers: None,
body: Some(b"This is a secure message!".to_vec()),
}
.into(),
).await?;
Ok(())
}
Advanced Usage
Acknowledgment Modes
STOMP offers different acknowledgment modes for message consumption:
use async_stomp::{AckMode, client::{Connector, Subscriber}, ToServer};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let mut conn = Connector::builder()
.server("localhost:61613")
.virtualhost("/")
.connect()
.await?;
let subscribe_client_ack = Subscriber::builder()
.destination("queue.important")
.id("sub-with-ack")
.headers(vec![("ack".to_string(), "client".to_string())])
.subscribe();
conn.send(subscribe_client_ack).await?;
conn.send(
ToServer::Ack {
id: "message-123".to_string(),
transaction: None,
}
.into()
).await?;
conn.send(
ToServer::Nack {
id: "message-456".to_string(),
transaction: None,
}
.into()
).await?;
Ok(())
}
Connection Lifecycle Management
use futures::prelude::*;
use async_stomp::{client::Connector, ToServer};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let mut conn = Connector::builder()
.server("localhost:61613")
.virtualhost("/")
.connect()
.await?;
conn.send(
ToServer::Disconnect {
receipt: Some("disconnect-receipt".to_string())
}
.into()
).await?;
if let Some(msg) = conn.next().await {
}
Ok(())
}
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
Licensed under the EUPL.