Expand description
§RSMQ in async Rust
Async Rust port of RSMQ — a lightweight message queue built on Redis sorted sets and atomic Lua scripts. No extra brokers, just Redis. Wire-compatible with the original JavaScript implementation, so producers and consumers can mix languages.
See the README for the full guide and examples.
§Quick start
use rsmq_async::{Rsmq, RsmqConnection, RsmqError};
let mut rsmq = Rsmq::new(Default::default()).await?;
rsmq.create_queue("jobs", None, None, None).await?;
rsmq.send_message("jobs", "hello", None).await?;
if let Some(msg) = rsmq.receive_message::<String>("jobs", None).await? {
// ...process msg.message...
rsmq.delete_message("jobs", &msg.id).await?;
}Always call RsmqConnection::delete_message after a successful receive — that’s what
confirms delivery. Otherwise the message becomes visible again after the queue’s vt.
§Implementations
All three implement the same RsmqConnection trait, so write code against the trait
to stay implementation-agnostic.
Rsmq— multiplexed connection. Start here. Right for almost all workloads.PooledRsmq— connection pool. Use for large payloads where a slow op would block others.RsmqSync— sync wrapper for non-async contexts (requires thesyncfeature).
§Realtime notifications
Set realtime: true in RsmqOptions and RSMQ will PUBLISH to {ns}:rt:{qname} on every
send_message. Subscribe with redis-rs to wake workers immediately instead of polling.
Use a single subscriber per queue — multiple workers racing on SUBSCRIBE is a common mistake.
§Custom message types
Rsmq::send_message takes any Into<RedisBytes>; Rsmq::receive_message / Rsmq::pop_message
take any TryFrom<RedisBytes, Error = Vec<u8>>. Built-in impls cover String, &str, Vec<u8>,
and &[u8]. For your own types, implement the conversions:
use rsmq_async::RedisBytes;
struct MyPayload(Vec<u8>);
impl From<MyPayload> for RedisBytes {
fn from(p: MyPayload) -> RedisBytes { RedisBytes::from(p.0) }
}
impl TryFrom<RedisBytes> for MyPayload {
type Error = Vec<u8>;
fn try_from(b: RedisBytes) -> Result<Self, Vec<u8>> {
Ok(MyPayload(b.into_bytes()))
}
}§Cargo features
tokio-comp(default) — Tokio runtime support.smol-comp— smol runtime support. Disable defaults to use it standalone.sync(default) — enablesRsmqSyncandRsmqConnectionSync.serde(default) — enablesJson<T>,RsmqJsonExt, and theRsmqError::JsonErrorvariant. Pulls inserdeandserde_json.worker(default) — enablesWorker, a polling/heartbeat/realtime-aware async worker helper with a queue-name router. Tokio-only.break-js-comp— full microsecond-precision scores. Off by default to stay wire-compatible with the JS library; don’t mix abreak-js-compRust producer with a JS server on the same queue.
Structs§
- Decode
Error - Error wrapping a message-decoding failure; used so handlers can surface a typed error
instead of a panic when
TryFrom<RedisBytes>rejects the bytes. - Json
- Newtype wrapping a
Tso it can be sent and received as JSON through the existingRsmqConnectionAPI. - Pool
Options - Pooled
Rsmq - Redis
Bytes - Internal value representing the redis bytes.
It implements
TryFromStringandVec<u8>andFrom String,&str,Vec<u8>and&[u8]to itself. - Redis
Connection Manager - Rsmq
- Rsmq
Message - A new RSMQ message. You will get this when using pop_message or receive_message methods
- Rsmq
Options - Options for creating a new RSMQ instance.
- Rsmq
Queue Attributes - Struct defining a queue. They are set on “create_queue” and “set_queue_attributes”
- Rsmq
Sync - Worker
- Polls registered queues and dispatches to per-queue handlers. Build via
Worker::builder. - Worker
Builder - Builds a
Worker.
Enums§
- Rsmq
Error - This is the error type for any oprtation with this
library. It derives
ThisError
Traits§
- Rsmq
Connection - Rsmq
Connection Sync - Rsmq
Json Ext - Extension trait adding
send_json/receive_json/pop_jsonto anyRsmqConnection. - Rsmq
Json ExtSync - Sync counterpart of
RsmqJsonExt. Available with bothserdeandsyncfeatures.
Type Aliases§
- Rsmq
Result - This is an alias of
Result<T, RsmqError>for simplicity