iridium-stomp 0.3.2

Async STOMP 1.2 client for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# iridium-stomp

[![CI](https://github.com/bsiegfreid/iridium-stomp/actions/workflows/ci.yml/badge.svg)](https://github.com/bsiegfreid/iridium-stomp/actions/workflows/ci.yml)

An asynchronous STOMP 1.2 client library for Rust.

> **Early Development**: This library is heavily tested (300+ unit and fuzz tests) but has not yet been battle-tested in production environments. APIs may change. Use with appropriate caution.

## Design Goals

- **Async-first architecture** — Built on Tokio from the ground up.

- **Correct frame parsing** — Handles arbitrary TCP chunk boundaries, binary
  bodies with embedded NULs, and the full STOMP 1.2 frame format.

- **Automatic heartbeat management** — Negotiates heartbeat intervals per the
  spec, sends heartbeats when idle, and detects missed heartbeats from the
  server.

- **Transparent reconnection** — Stability-aware exponential backoff, automatic
  resubscription, and pending message cleanup on disconnect.

- **Small, explicit API** — One way to do things, clearly documented, easy to
  understand.

- **Production-ready testing** — 150+ tests including fuzz testing, stress
  testing, and regression capture for previously-failing edge cases.

## Quick Start

```rust,no_run
use iridium_stomp::{Connection, Frame, ReceivedFrame};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to a STOMP broker
    let conn = Connection::connect(
        "127.0.0.1:61613",
        "guest",
        "guest",
        Connection::DEFAULT_HEARTBEAT,  // 10 seconds send/receive
    ).await?;

    // Send a message
    let msg = Frame::new("SEND")
        .header("destination", "/queue/test")
        .set_body(b"hello from iridium-stomp".to_vec());
    conn.send_frame(msg).await?;

    // Subscribe to a queue
    let mut subscription = conn
        .subscribe("/queue/test", iridium_stomp::AckMode::Auto)
        .await?;

    // Receive messages using the Stream trait
    use futures::StreamExt;
    while let Some(frame) = subscription.next().await {
        println!("Received: {:?}", frame);
    }

    conn.close().await;
    Ok(())
}
```

## Features

### Heartbeat Negotiation

Heartbeats are negotiated automatically during connection. Use the provided
constants or the `Heartbeat` struct for type-safe configuration:

```rust,ignore
use iridium_stomp::{Connection, Heartbeat};

// Use predefined constants
let conn = Connection::connect(addr, login, pass, Connection::DEFAULT_HEARTBEAT).await?;
let conn = Connection::connect(addr, login, pass, Connection::NO_HEARTBEAT).await?;

// Or use the Heartbeat struct for custom intervals
let hb = Heartbeat::new(5000, 10000);  // send every 5s, expect every 10s
let conn = Connection::connect(addr, login, pass, &hb.to_string()).await?;

// Create from Duration for symmetric intervals
use std::time::Duration;
let hb = Heartbeat::from_duration(Duration::from_secs(15));
```

The library handles the negotiation (taking the maximum of client and server
preferences), sends heartbeats when the connection is idle, and closes the
connection if the server stops responding.

### Subscription Management

Subscribe to destinations with automatic resubscription on reconnect:

```rust,ignore
use iridium_stomp::connection::AckMode;

// Auto-acknowledge (server considers delivered immediately)
let sub = conn.subscribe("/queue/events", AckMode::Auto).await?;

// Client-acknowledge (cumulative)
let sub = conn.subscribe("/queue/jobs", AckMode::Client).await?;

// Client-individual (per-message acknowledgement)
let sub = conn.subscribe("/queue/tasks", AckMode::ClientIndividual).await?;
```

For broker-specific headers (durable subscriptions, selectors, etc.):

```rust,ignore
use iridium_stomp::SubscriptionOptions;
use iridium_stomp::connection::AckMode;

let options = SubscriptionOptions {
    headers: vec![
        ("activemq.subscriptionName".into(), "my-durable-sub".into()),
        ("selector".into(), "priority > 5".into()),
    ],
    durable_queue: None,
};

let sub = conn.subscribe_with_options("/topic/events", AckMode::Client, options).await?;
```

### Cloneable Connection

The `Connection` is cloneable and thread-safe. Multiple tasks can share the
same connection:

```rust,ignore
let conn = Connection::connect(...).await?;
let conn2 = conn.clone();

tokio::spawn(async move {
    conn2.send_frame(some_frame).await.unwrap();
});
```

### Custom CONNECT Headers

Use `ConnectOptions` to customize the STOMP CONNECT frame for broker-specific
requirements like durable subscriptions or virtual hosts:

```rust,ignore
use iridium_stomp::{Connection, ConnectOptions};

let options = ConnectOptions::new()
    .client_id("my-durable-client")     // Required for ActiveMQ durable subscriptions
    .host("/production")                 // Virtual host (RabbitMQ)
    .accept_version("1.1,1.2")          // Version negotiation
    .header("custom-key", "value");     // Broker-specific headers

let conn = Connection::connect_with_options(
    "localhost:61613",
    "guest",
    "guest",
    Connection::DEFAULT_HEARTBEAT,
    options,
).await?;
```

### Receipt Confirmation

Request delivery confirmation from the broker using RECEIPT frames:

```rust,ignore
use iridium_stomp::{Connection, Frame};
use std::time::Duration;

let msg = Frame::new("SEND")
    .header("destination", "/queue/important")
    .receipt("msg-123")  // Request receipt with this ID
    .set_body(b"critical data".to_vec());

// Send and wait for confirmation (with timeout)
conn.send_frame_confirmed(msg, Duration::from_secs(5)).await?;

// Or handle receipts manually
let msg = Frame::new("SEND")
    .header("destination", "/queue/test")
    .receipt("msg-456")
    .set_body(b"data".to_vec());
conn.send_frame_with_receipt(msg).await?;
conn.wait_for_receipt("msg-456", Duration::from_secs(5)).await?;
```

### Connection Error Handling

Connection failures (invalid credentials, server unreachable) are reported immediately:

```rust,ignore
use iridium_stomp::Connection;
use iridium_stomp::connection::ConnError;

match Connection::connect("localhost:61613", "user", "pass", Connection::DEFAULT_HEARTBEAT).await {
    Ok(conn) => {
        // Connected successfully
    }
    Err(ConnError::ServerRejected(err)) => {
        // Authentication failed or server rejected connection
        eprintln!("Server rejected: {}", err.message);
    }
    Err(ConnError::Io(err)) => {
        // Network error (connection refused, timeout, etc.)
        eprintln!("Network error: {}", err);
    }
    Err(err) => {
        eprintln!("Connection failed: {}", err);
    }
}
```

### Server Error Handling

Errors received after connection are surfaced as `ReceivedFrame::Error`:

```rust,ignore
use iridium_stomp::{Connection, ReceivedFrame};

while let Some(received) = conn.next_frame().await {
    match received {
        ReceivedFrame::Frame(frame) => {
            println!("Got {}: {:?}", frame.command, frame.get_header("destination"));
        }
        ReceivedFrame::Error(err) => {
            eprintln!("Server error: {}", err.message);
            if let Some(body) = &err.body {
                eprintln!("Details: {}", body);
            }
            break;
        }
    }
}
```

### Reconnection Backoff

When a connection drops, the library automatically reconnects with exponential
backoff and resubscribes to all active subscriptions. The backoff behavior is
stability-aware: it distinguishes between a long-lived connection that dropped
(transient failure) and a connection that dies immediately after connecting
(persistent failure).

**Stability-aware backoff:**

- If the connection was alive for at least `max(current_backoff, 5)` seconds,
  it is considered stable. On disconnect, backoff resets to 1 second for a fast
  reconnect.
- If the connection dies quickly after establishing (e.g., the broker closes the
  connection during resubscription), backoff doubles on each attempt up to a 30
  second cap: 1s → 2s → 4s → 8s → 16s → 30s.
- Authentication failures during reconnection continue exponential backoff
  without checking connection stability (they do not trigger a backoff reset).

| Scenario | Behavior |
|----------|----------|
| Stable connection drops after minutes | Reconnect in 1s (backoff resets) |
| Broker rejects subscriptions and closes connection | 1s, 2s, 4s, 8s, 16s, 30s cap |
| Authentication failure on reconnect | Exponential backoff (no stability-based reset) |
| Broker unreachable | Exponential backoff up to 30s |

#### Broker-Specific Notes

**Artemis**: When Artemis rejects a SUBSCRIBE due to permissions, it sends a
STOMP ERROR frame but does **not** close the TCP connection. This violates the
[STOMP 1.2 specification](https://stomp.github.io/stomp-specification-1.2.html),
which states: "The server MAY send ERROR frames if something goes wrong. In this
case, it **MUST** then close the connection just after sending the ERROR frame."
Because Artemis keeps the connection open, the reconnect backoff path is never
triggered — errors are delivered inline on the existing connection, potentially
causing a rapid error loop if your application automatically retries
subscriptions. The library surfaces these errors via `ReceivedFrame::Error` for
application-level handling; you may need to implement your own rate limiting or
circuit breaker for Artemis deployments.

**RabbitMQ**: Follows the STOMP spec correctly — ERROR frames are followed by
connection close, which triggers the reconnect backoff as expected.

## CLI

An interactive CLI is included for testing and ad-hoc messaging. Install with
the `cli` feature:

```bash
cargo install iridium-stomp --features cli
```

Or run from source:

```bash
cargo run --features cli --bin stomp -- --help
```

### CLI Usage

```bash
# Connect and subscribe to a queue
stomp -a 127.0.0.1:61613 -s /queue/test

# Connect with custom credentials
stomp -a broker.example.com:61613 -l myuser -p mypass -s /queue/events

# Subscribe to multiple queues
stomp -s /queue/orders -s /queue/notifications

# Enable TUI mode for live monitoring
stomp --tui -a 127.0.0.1:61613 -s /topic/events
```

### TUI Mode

The `--tui` flag enables a full terminal interface with:

- **Activity panel** - Live subscription counts with color coding
- **Message panel** - Scrollable message history with timestamps
- **Heartbeat indicator** - Animated pulse showing connection health
- **Command history** - Up/down arrows to navigate previous commands
- **Header toggle** - Press `Ctrl+H` to show/hide message headers

### Plain Mode

Without `--tui`, the CLI runs in plain mode with simple scrolling output:

```text
> send /queue/test Hello, World!
Sent to /queue/test

> sub /queue/other
Subscribed to: /queue/other

> help
Commands:
  send <destination> <message>  - Send a message
  sub <destination>             - Subscribe to a destination
  quit                          - Exit

> quit
Disconnecting...
```

## Running the Examples

Start a local STOMP broker (RabbitMQ with STOMP plugin):

```bash
docker stack deploy -c rabbitmq-stack.yaml rabbitmq
```

Run the quickstart example:

```bash
cargo run --example quickstart
```

Subscribe to multiple queues and print incoming messages (see also [`docs/subscriber-guide.md`](docs/subscriber-guide.md)):

```bash
cargo run --example multi_subscribe
```

Stop the broker:

```bash
docker stack rm rabbitmq
```

## Testing

The library includes comprehensive tests:

```bash
# Run all tests
cargo test

# Run specific test suites
cargo test --test heartbeat_unit    # Heartbeat parsing/negotiation
cargo test --test codec_heartbeat   # Wire format encoding/decoding
cargo test --test parser_unit       # Frame parsing edge cases
cargo test --test codec_fuzz        # Randomized chunk splitting
cargo test --test codec_stress      # Concurrent stress testing
```

### Integration Tests in CI

The CI workflow includes a smoke integration test that verifies the library
works against a real RabbitMQ broker with STOMP enabled. This test ensures
end-to-end functionality beyond unit tests.

**How it works:**

1. **Broker Setup**: CI builds a Docker image with RabbitMQ 3.11 and the STOMP plugin pre-enabled (see `.github/docker/rabbitmq-stomp/Dockerfile`)

2. **Readiness Checks**: Before running tests, CI performs multi-stage readiness verification:
   - Waits for RabbitMQ management API to respond (indicates broker is starting)
   - Verifies STOMP plugin is fully enabled via the management API
   - Confirms STOMP port 61613 accepts TCP connections
   
   This ensures the broker is truly ready, preventing flaky test failures from timing issues.

3. **Smoke Test**: Runs `tests/stomp_smoke.rs` which:
   - Attempts a STOMP CONNECT with retry logic (5 attempts with backoff)
   - Verifies the broker responds with CONNECTED frame
   - Reports detailed connection diagnostics on failure

4. **Debugging**: If tests fail, CI automatically dumps RabbitMQ logs for troubleshooting

**Running integration tests locally:**

Use the provided helper script which mimics the CI workflow:

```bash
./scripts/test-with-rabbit.sh
```

Or manually with docker swarm:

```bash
# Start RabbitMQ with STOMP
docker stack deploy -c rabbitmq-stack.yaml rabbitmq

# Wait for it to be ready (management UI at http://localhost:15672)
# Then run the smoke test
RUN_STOMP_SMOKE=1 cargo test --test stomp_smoke

# Cleanup
docker stack rm rabbitmq
```

The smoke test is skipped by default unless `RUN_STOMP_SMOKE=1` is set, since it requires an external broker.

## License

This project is licensed under the MIT License. See [LICENSE](LICENSE) for
details.