rsmq-async
An async Rust port of RSMQ — a lightweight message queue built entirely on Redis. No extra infrastructure, no brokers, just Redis.
Wire-compatible with the original JavaScript implementation: messages enqueued by a JS server can be consumed by a Rust worker and vice versa.
How it works
RSMQ stores queues as Redis sorted sets. Each message gets a score based on its delivery time, so delayed and hidden messages are naturally ordered. All queue operations are executed as atomic Lua scripts, making them safe under concurrent access.
- At-least-once delivery — receive a message, process it, then delete it. If your worker crashes mid-flight, the message becomes visible again after its visibility timeout.
- At-most-once delivery — use
pop_messageto receive and delete in one atomic step. - Delayed messages — messages can be held back for a configurable duration before becoming visible.
- Per-message visibility timeouts — received messages stay hidden for a configurable window, giving your worker time to process them.
Quick start
[]
= "16"
use ;
async
Always
delete_messageafter successfully processing — this is what confirms delivery.
Implementations
Three implementations are provided, all behind the same RsmqConnection trait.
| Type | Use when |
|---|---|
Rsmq |
Start here. A single multiplexed connection handles concurrent operations efficiently — no pool overhead, no contention. Right for the vast majority of workloads. |
PooledRsmq |
You're sending large payloads (images, documents, big blobs) and one slow operation blocking the shared connection becomes a problem. |
RsmqSync |
You're in a sync context. Wraps Rsmq in a Tokio runtime. Requires the sync feature. |
Write code against the trait to stay implementation-agnostic:
use RsmqConnection;
async
Connection pool
use ;
let pool_opts = PoolOptions ;
let mut rsmq = new.await?;
Sync wrapper
use ;
let mut rsmq = new.await?;
rsmq.send_message?;
Message types
send_message, receive_message, and pop_message are generic over your message type.
Built-in implementations cover the common cases:
// Send and receive strings
rsmq.send_message.await?;
let msg = rsmq..await?;
// Or raw bytes
rsmq.send_message.await?;
let msg = rsmq..await?;
For custom types, implement TryFrom<RedisBytes> to receive and Into<RedisBytes> to send:
use RedisBytes;
Queue configuration
use Duration;
rsmq.create_queue.await?;
Queue attributes can be updated after creation:
rsmq.set_queue_attributes.await?;
You can inspect queue stats at any time:
let attrs = rsmq.get_queue_attributes.await?;
println!;
Realtime notifications
Set realtime: true in RsmqOptions to have RSMQ publish to {ns}:rt:{qname} on every send_message. Subscribe with redis-rs to wake workers immediately instead of polling.
use RsmqOptions;
let mut rsmq = new.await?;
Use a single subscriber per queue — multiple workers listening on the same SUBSCRIBE channel and racing to call
receive_messageis a common mistake.
Features
| Feature | Default | Description |
|---|---|---|
tokio-comp |
yes | Tokio async runtime support |
smol-comp |
no | smol async runtime support |
sync |
yes | Enables RsmqSync and RsmqConnectionSync |
break-js-comp |
no | Microsecond-precision scores and IDs (see below) |
To use the smol runtime instead of Tokio:
= { = "16", = false, = ["smol-comp"] }
break-js-comp
By default, rsmq-async is wire-compatible with the JS library: message IDs encode microseconds in base36 (matching JS) and queue scores are stored in milliseconds.
Enabling break-js-comp switches scores to full microsecond precision. This gives finer ordering for high-throughput queues but means messages written by a JS server and a Rust server with break-js-comp will have mismatched score units — don't mix the two on the same queue.
= { = "16", = ["break-js-comp"] }
Delivery guarantees
| Pattern | How |
|---|---|
| At least once | receive_message + delete_message after success. Failures re-deliver after visibility timeout. |
| At most once | pop_message atomically dequeues and deletes. |
Development
Start a local Redis with Docker:
Run the tests (sequential is required — tests share the same Redis DB):
To target a different Redis instance:
REDIS_URL=127.0.0.1:6380