# iridium-stomp
[](https://github.com/bsiegfreid/iridium-stomp/actions/workflows/ci.yml)
An asynchronous STOMP 1.2 client library for Rust.
> **Early Development**: This library is heavily tested (300+ unit and fuzz tests) but has not yet been battle-tested in production environments. APIs may change. Use with appropriate caution.
## Design Goals
- **Async-first architecture** — Built on Tokio from the ground up.
- **Correct frame parsing** — Handles arbitrary TCP chunk boundaries, binary
bodies with embedded NULs, and the full STOMP 1.2 frame format.
- **Automatic heartbeat management** — Negotiates heartbeat intervals per the
spec, sends heartbeats when idle, and detects missed heartbeats from the
server.
- **Transparent reconnection** — Stability-aware exponential backoff, automatic
resubscription, and pending message cleanup on disconnect.
- **Small, explicit API** — One way to do things, clearly documented, easy to
understand.
- **Production-ready testing** — 150+ tests including fuzz testing, stress
testing, and regression capture for previously-failing edge cases.
## Quick Start
```rust,no_run
use iridium_stomp::{Connection, Frame, ReceivedFrame};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to a STOMP broker
let conn = Connection::connect(
"127.0.0.1:61613",
"guest",
"guest",
Connection::DEFAULT_HEARTBEAT, // 10 seconds send/receive
).await?;
// Send a message
let msg = Frame::new("SEND")
.header("destination", "/queue/test")
.set_body(b"hello from iridium-stomp".to_vec());
conn.send_frame(msg).await?;
// Subscribe to a queue
let mut subscription = conn
.subscribe("/queue/test", iridium_stomp::AckMode::Auto)
.await?;
// Receive messages using the Stream trait
use futures::StreamExt;
while let Some(frame) = subscription.next().await {
println!("Received: {:?}", frame);
}
conn.close().await;
Ok(())
}
```
## Features
### Heartbeat Negotiation
Heartbeats are negotiated automatically during connection. Use the provided
constants or the `Heartbeat` struct for type-safe configuration:
```rust,ignore
use iridium_stomp::{Connection, Heartbeat};
// Use predefined constants
let conn = Connection::connect(addr, login, pass, Connection::DEFAULT_HEARTBEAT).await?;
let conn = Connection::connect(addr, login, pass, Connection::NO_HEARTBEAT).await?;
// Or use the Heartbeat struct for custom intervals
let hb = Heartbeat::new(5000, 10000); // send every 5s, expect every 10s
let conn = Connection::connect(addr, login, pass, &hb.to_string()).await?;
// Create from Duration for symmetric intervals
use std::time::Duration;
let hb = Heartbeat::from_duration(Duration::from_secs(15));
```
The library handles the negotiation (taking the maximum of client and server
preferences), sends heartbeats when the connection is idle, and closes the
connection if the server stops responding.
### Subscription Management
Subscribe to destinations with automatic resubscription on reconnect:
```rust,ignore
use iridium_stomp::connection::AckMode;
// Auto-acknowledge (server considers delivered immediately)
let sub = conn.subscribe("/queue/events", AckMode::Auto).await?;
// Client-acknowledge (cumulative)
let sub = conn.subscribe("/queue/jobs", AckMode::Client).await?;
// Client-individual (per-message acknowledgement)
let sub = conn.subscribe("/queue/tasks", AckMode::ClientIndividual).await?;
```
For broker-specific headers (durable subscriptions, selectors, etc.):
```rust,ignore
use iridium_stomp::SubscriptionOptions;
use iridium_stomp::connection::AckMode;
let options = SubscriptionOptions {
headers: vec![
("activemq.subscriptionName".into(), "my-durable-sub".into()),
("selector".into(), "priority > 5".into()),
],
durable_queue: None,
};
let sub = conn.subscribe_with_options("/topic/events", AckMode::Client, options).await?;
```
### Cloneable Connection
The `Connection` is cloneable and thread-safe. Multiple tasks can share the
same connection:
```rust,ignore
let conn = Connection::connect(...).await?;
let conn2 = conn.clone();
tokio::spawn(async move {
conn2.send_frame(some_frame).await.unwrap();
});
```
### Custom CONNECT Headers
Use `ConnectOptions` to customize the STOMP CONNECT frame for broker-specific
requirements like durable subscriptions or virtual hosts:
```rust,ignore
use iridium_stomp::{Connection, ConnectOptions};
let options = ConnectOptions::new()
.client_id("my-durable-client") // Required for ActiveMQ durable subscriptions
.host("/production") // Virtual host (RabbitMQ)
.accept_version("1.1,1.2") // Version negotiation
.header("custom-key", "value"); // Broker-specific headers
let conn = Connection::connect_with_options(
"localhost:61613",
"guest",
"guest",
Connection::DEFAULT_HEARTBEAT,
options,
).await?;
```
### Receipt Confirmation
Request delivery confirmation from the broker using RECEIPT frames:
```rust,ignore
use iridium_stomp::{Connection, Frame};
use std::time::Duration;
let msg = Frame::new("SEND")
.header("destination", "/queue/important")
.receipt("msg-123") // Request receipt with this ID
.set_body(b"critical data".to_vec());
// Send and wait for confirmation (with timeout)
conn.send_frame_confirmed(msg, Duration::from_secs(5)).await?;
// Or handle receipts manually
let msg = Frame::new("SEND")
.header("destination", "/queue/test")
.receipt("msg-456")
.set_body(b"data".to_vec());
conn.send_frame_with_receipt(msg).await?;
conn.wait_for_receipt("msg-456", Duration::from_secs(5)).await?;
```
### Connection Error Handling
Connection failures (invalid credentials, server unreachable) are reported immediately:
```rust,ignore
use iridium_stomp::Connection;
use iridium_stomp::connection::ConnError;
match Connection::connect("localhost:61613", "user", "pass", Connection::DEFAULT_HEARTBEAT).await {
Ok(conn) => {
// Connected successfully
}
Err(ConnError::ServerRejected(err)) => {
// Authentication failed or server rejected connection
eprintln!("Server rejected: {}", err.message);
}
Err(ConnError::Io(err)) => {
// Network error (connection refused, timeout, etc.)
eprintln!("Network error: {}", err);
}
Err(err) => {
eprintln!("Connection failed: {}", err);
}
}
```
### Server Error Handling
Errors received after connection are surfaced as `ReceivedFrame::Error`:
```rust,ignore
use iridium_stomp::{Connection, ReceivedFrame};
while let Some(received) = conn.next_frame().await {
match received {
ReceivedFrame::Frame(frame) => {
println!("Got {}: {:?}", frame.command, frame.get_header("destination"));
}
ReceivedFrame::Error(err) => {
eprintln!("Server error: {}", err.message);
if let Some(body) = &err.body {
eprintln!("Details: {}", body);
}
break;
}
}
}
```
### Reconnection Backoff
When a connection drops, the library automatically reconnects with exponential
backoff and resubscribes to all active subscriptions. The backoff behavior is
stability-aware: it distinguishes between a long-lived connection that dropped
(transient failure) and a connection that dies immediately after connecting
(persistent failure).
**Stability-aware backoff:**
- If the connection was alive for at least `max(current_backoff, 5)` seconds,
it is considered stable. On disconnect, backoff resets to 1 second for a fast
reconnect.
- If the connection dies quickly after establishing (e.g., the broker closes the
connection during resubscription), backoff doubles on each attempt up to a 30
second cap: 1s → 2s → 4s → 8s → 16s → 30s.
- Authentication failures during reconnection continue exponential backoff
without checking connection stability (they do not trigger a backoff reset).
| Stable connection drops after minutes | Reconnect in 1s (backoff resets) |
| Broker rejects subscriptions and closes connection | 1s, 2s, 4s, 8s, 16s, 30s cap |
| Authentication failure on reconnect | Exponential backoff (no stability-based reset) |
| Broker unreachable | Exponential backoff up to 30s |
#### Broker-Specific Notes
**Artemis**: When Artemis rejects a SUBSCRIBE due to permissions, it sends a
STOMP ERROR frame but does **not** close the TCP connection. This violates the
[STOMP 1.2 specification](https://stomp.github.io/stomp-specification-1.2.html),
which states: "The server MAY send ERROR frames if something goes wrong. In this
case, it **MUST** then close the connection just after sending the ERROR frame."
Because Artemis keeps the connection open, the reconnect backoff path is never
triggered — errors are delivered inline on the existing connection, potentially
causing a rapid error loop if your application automatically retries
subscriptions. The library surfaces these errors via `ReceivedFrame::Error` for
application-level handling; you may need to implement your own rate limiting or
circuit breaker for Artemis deployments.
**RabbitMQ**: Follows the STOMP spec correctly — ERROR frames are followed by
connection close, which triggers the reconnect backoff as expected.
## CLI
An interactive CLI is included for testing and ad-hoc messaging. Install with
the `cli` feature:
```bash
cargo install iridium-stomp --features cli
```
Or run from source:
```bash
cargo run --features cli --bin stomp -- --help
```
### CLI Usage
```bash
# Connect and subscribe to a queue
stomp -a 127.0.0.1:61613 -s /queue/test
# Connect with custom credentials
stomp -a broker.example.com:61613 -l myuser -p mypass -s /queue/events
# Subscribe to multiple queues
stomp -s /queue/orders -s /queue/notifications
# Enable TUI mode for live monitoring
stomp --tui -a 127.0.0.1:61613 -s /topic/events
```
### TUI Mode
The `--tui` flag enables a full terminal interface with:
- **Activity panel** - Live subscription counts with color coding
- **Message panel** - Scrollable message history with timestamps
- **Heartbeat indicator** - Animated pulse showing connection health
- **Command history** - Up/down arrows to navigate previous commands
- **Header toggle** - Press `Ctrl+H` to show/hide message headers
### Plain Mode
Without `--tui`, the CLI runs in plain mode with simple scrolling output:
```text
> send /queue/test Hello, World!
Sent to /queue/test
> sub /queue/other
Subscribed to: /queue/other
> help
Commands:
send <destination> <message> - Send a message
sub <destination> - Subscribe to a destination
quit - Exit
> quit
Disconnecting...
```
## Running the Examples
Start a local STOMP broker (RabbitMQ with STOMP plugin):
```bash
docker stack deploy -c rabbitmq-stack.yaml rabbitmq
```
Run the quickstart example:
```bash
cargo run --example quickstart
```
Subscribe to multiple queues and print incoming messages (see also [`docs/subscriber-guide.md`](docs/subscriber-guide.md)):
```bash
cargo run --example multi_subscribe
```
Stop the broker:
```bash
docker stack rm rabbitmq
```
## Testing
The library includes comprehensive tests:
```bash
# Run all tests
cargo test
# Run specific test suites
cargo test --test heartbeat_unit # Heartbeat parsing/negotiation
cargo test --test codec_heartbeat # Wire format encoding/decoding
cargo test --test parser_unit # Frame parsing edge cases
cargo test --test codec_fuzz # Randomized chunk splitting
cargo test --test codec_stress # Concurrent stress testing
```
### Integration Tests in CI
The CI workflow includes a smoke integration test that verifies the library
works against a real RabbitMQ broker with STOMP enabled. This test ensures
end-to-end functionality beyond unit tests.
**How it works:**
1. **Broker Setup**: CI builds a Docker image with RabbitMQ 3.11 and the STOMP plugin pre-enabled (see `.github/docker/rabbitmq-stomp/Dockerfile`)
2. **Readiness Checks**: Before running tests, CI performs multi-stage readiness verification:
- Waits for RabbitMQ management API to respond (indicates broker is starting)
- Verifies STOMP plugin is fully enabled via the management API
- Confirms STOMP port 61613 accepts TCP connections
This ensures the broker is truly ready, preventing flaky test failures from timing issues.
3. **Smoke Test**: Runs `tests/stomp_smoke.rs` which:
- Attempts a STOMP CONNECT with retry logic (5 attempts with backoff)
- Verifies the broker responds with CONNECTED frame
- Reports detailed connection diagnostics on failure
4. **Debugging**: If tests fail, CI automatically dumps RabbitMQ logs for troubleshooting
**Running integration tests locally:**
Use the provided helper script which mimics the CI workflow:
```bash
./scripts/test-with-rabbit.sh
```
Or manually with docker swarm:
```bash
# Start RabbitMQ with STOMP
docker stack deploy -c rabbitmq-stack.yaml rabbitmq
# Wait for it to be ready (management UI at http://localhost:15672)
# Then run the smoke test
RUN_STOMP_SMOKE=1 cargo test --test stomp_smoke
# Cleanup
docker stack rm rabbitmq
```
The smoke test is skipped by default unless `RUN_STOMP_SMOKE=1` is set, since it requires an external broker.
## License
This project is licensed under the MIT License. See [LICENSE](LICENSE) for
details.