Skip to main content

Crate rsmq_async

Crate rsmq_async 

Source
Expand description

§RSMQ in async Rust

Async Rust port of RSMQ — a lightweight message queue built on Redis sorted sets and atomic Lua scripts. No extra brokers, just Redis. Wire-compatible with the original JavaScript implementation, so producers and consumers can mix languages.

See the README for the full guide and examples.

§Quick start

use rsmq_async::{Rsmq, RsmqConnection, RsmqError};

let mut rsmq = Rsmq::new(Default::default()).await?;

rsmq.create_queue("jobs", None, None, None).await?;
rsmq.send_message("jobs", "hello", None).await?;

if let Some(msg) = rsmq.receive_message::<String>("jobs", None).await? {
    // ...process msg.message...
    rsmq.delete_message("jobs", &msg.id).await?;
}

Always call RsmqConnection::delete_message after a successful receive — that’s what confirms delivery. Otherwise the message becomes visible again after the queue’s vt.

§Implementations

All three implement the same RsmqConnection trait, so write code against the trait to stay implementation-agnostic.

  • Rsmq — multiplexed connection. Start here. Right for almost all workloads.
  • PooledRsmq — connection pool. Use for large payloads where a slow op would block others.
  • RsmqSync — sync wrapper for non-async contexts (requires the sync feature).

§Realtime notifications

Set realtime: true in RsmqOptions and RSMQ will PUBLISH to {ns}:rt:{qname} on every send_message. Subscribe with redis-rs to wake workers immediately instead of polling. Use a single subscriber per queue — multiple workers racing on SUBSCRIBE is a common mistake.

§Custom message types

Rsmq::send_message takes any Into<RedisBytes>; Rsmq::receive_message / Rsmq::pop_message take any TryFrom<RedisBytes, Error = Vec<u8>>. Built-in impls cover String, &str, Vec<u8>, and &[u8]. For your own types, implement the conversions:

use rsmq_async::RedisBytes;

struct MyPayload(Vec<u8>);

impl From<MyPayload> for RedisBytes {
    fn from(p: MyPayload) -> RedisBytes { RedisBytes::from(p.0) }
}

impl TryFrom<RedisBytes> for MyPayload {
    type Error = Vec<u8>;
    fn try_from(b: RedisBytes) -> Result<Self, Vec<u8>> {
        Ok(MyPayload(b.into_bytes()))
    }
}

§Cargo features

  • tokio-comp (default) — Tokio runtime support.
  • smol-comp — smol runtime support. Disable defaults to use it standalone.
  • sync (default) — enables RsmqSync and RsmqConnectionSync.
  • serde (default) — enables Json<T>, RsmqJsonExt, and the RsmqError::JsonError variant. Pulls in serde and serde_json.
  • worker (default) — enables Worker, a polling/heartbeat/realtime-aware async worker helper with a queue-name router. Tokio-only.
  • break-js-comp — full microsecond-precision scores. Off by default to stay wire-compatible with the JS library; don’t mix a break-js-comp Rust producer with a JS server on the same queue.

Structs§

DecodeError
Error wrapping a message-decoding failure; used so handlers can surface a typed error instead of a panic when TryFrom<RedisBytes> rejects the bytes.
Json
Newtype wrapping a T so it can be sent and received as JSON through the existing RsmqConnection API.
PoolOptions
PooledRsmq
RedisBytes
Internal value representing the redis bytes. It implements TryFrom String and Vec<u8> and From String, &str, Vec<u8> and &[u8] to itself.
RedisConnectionManager
Rsmq
RsmqMessage
A new RSMQ message. You will get this when using pop_message or receive_message methods
RsmqOptions
Options for creating a new RSMQ instance.
RsmqQueueAttributes
Struct defining a queue. They are set on “create_queue” and “set_queue_attributes”
RsmqSync
Worker
Polls registered queues and dispatches to per-queue handlers. Build via Worker::builder.
WorkerBuilder
Builds a Worker.

Enums§

RsmqError
This is the error type for any oprtation with this library. It derives ThisError

Traits§

RsmqConnection
RsmqConnectionSync
RsmqJsonExt
Extension trait adding send_json / receive_json / pop_json to any RsmqConnection.
RsmqJsonExtSync
Sync counterpart of RsmqJsonExt. Available with both serde and sync features.

Type Aliases§

RsmqResult
This is an alias of Result<T, RsmqError> for simplicity