Expand description
§RSMQ in async Rust
RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+. It contains the same methods as the original one in https://github.com/smrchy/rsmq
§Traits
This library provides two core traits for interacting with Redis message queues:
RsmqConnection
: The async trait that defines all queue operations. Must be imported withuse rsmq_async::RsmqConnection;
RsmqConnectionSync
: The synchronous version of the trait, available with the “sync” feature. Must be imported withuse rsmq_async::RsmqConnectionSync;
§Implementations
Three main implementations are provided:
Rsmq
: The preferred implementation using a multiplexed Redis connectionPooledRsmq
: Uses a connection pool for large messagesRsmqSync
: A synchronous wrapper (requires “sync” feature)
§Example
let message = rsmq.receive_message::<String>("myqueue", None).await?;
if let Some(message) = message { rsmq.delete_message("myqueue", &message.id).await?; }
§Installation
Check https://crates.io/crates/rsmq_async
§Example
use rsmq_async::{Rsmq, RsmqConnection};
async fn it_works() { let mut rsmq = Rsmq::new(Default::default())
.await
.expect("connection failed");
rsmq.create_queue("myqueue", None, None, None)
.await
.expect("failed to create queue");
rsmq.send_message("myqueue", "testmessage", None)
.await
.expect("failed to send message");
let message = rsmq
.receive_message::<String>("myqueue", None)
.await
.expect("cannot receive message");
if let Some(message) = message { rsmq.delete_message("myqueue", &message.id).await; } }
§Realtime
When initializing RSMQ you can enable the realtime PUBLISH for new messages. On every new message that gets sent to
RSQM via sendMessage
a Redis PUBLISH will be issued to {rsmq.ns}:rt:{qname}
. So, you can subscribe to it using
redis-rs library directly.
§How to use the realtime option
Besides the PUBLISH redis command when a new message is sent to RSMQ nothing else will happen. Your app could use
the Redis SUBSCRIBE command to be notified of new messages and issue a receiveMessage
then. However make sure not
to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous receiveMessage
calls.
§Time Precision
By default this library keeps compatibility with the JS counterpart. If you require
sub-second precision or are sending many messages very close together and require to
keep track of them with more precision than one second, you can enable the feature
break-js-comp
like this on your Cargo.toml
rsmq_async = { version = "11", features = [ "break-js-comp" ] }
§Guarantees
If you want to implement “at least one delivery” guarantee, you need to receive the messages using “receive_message” and then, once the message is successfully processed, delete it with “delete_message”.
§Connection Pool
If you want to use a connection pool, just use PooledRsmq
instad of Rsmq. It implements the RsmqConnection trait
as the normal Rsmq.
If you want to accept any of both implementation, just accept the trait RsmqConnection
§Executor compatibility
By default it will intruct redis-rs library to enable async-std and tokio compatibility and choose Tokio
if Tokio is avaialble, async-std if not. If you want to choose, you can change the Cargo.toml
definition to
rsmq_async = { version = "9", default-features = false, features = ["tokio-comp"] }
Where "tokio-comp"
can also be "async-std-comp"
.
§Rsmq
vs PooledRsmq
In almost all workloads you might prefer the Rsmq
object, as it works with a multiplexed connection.
For specific workloads, where you might be sending a lof of data (images, documents, big blobs) you might prefer to
use the PooledRsmq
and configure it with PoolOptions
.
They both use the redis::aio::MultiplexedConnection
, but the pooled connection can be configured to spawn several
of those, so one operation won’t block the other.
§Response types
There are 3 functions that take generic types:
pop_message
andreceive_message
: Where the type for the received message isRsmqMessage<E>
whereE: TryFrom<RedisBytes, Error = Vec<u8>>
. So, If you have custom type, you can implement the traitTryFrom<RedisBytes>
forYourCustomType
and use it like:rsmq.receive_message::<YourCustomType> ("myqueue", None)
. Implementations are provided forString
andVec<u8>
.send_message
where the message to send needs to implementInto<RedisBytes> + Send
. So you will need to implement the trait for your type. You can check the implementations for the type RedisBytes and see how we did it. Implementations are provided forString
,&str
andVec<u8>
.
All this is because strings in Rust are very convenient to use for json messages, so always returning a Vec<u8>
may not be the most ergonomic solution. But at the same time, we can just add some already made implementations for
it and you can just use it with your type or, if you are sending, let’s say, images, just use the method like:
rsmq.receive_message::<Vec<u8>>("myqueue", None)
and transform it later to your type. (Or just implement the
TryFrom<RedisBytes>
for your type and the transformation will be automatic.)
§Example for implementing a custom type
impl TryFrom<RedisBytes> for String {
type Error = Vec<u8>; // Always set Error as Vec<u8>;
fn try_from(bytes: RedisBytes) -> Result<Self, Self::Error> {
String::from_utf8(bytes.0).map_err(|e| e.into_bytes())
}
}
Structs§
- Pool
Options - Pooled
Rsmq - Redis
Bytes - Internal value representing the redis bytes.
It implements
TryFrom
String
andVec<u8>
andFrom String
,&str
,Vec<u8>
and&[u8]
to itself. - Redis
Connection Manager - Rsmq
- Rsmq
Message - A new RSMQ message. You will get this when using pop_message or receive_message methods
- Rsmq
Options - Options for creating a new RSMQ instance.
- Rsmq
Queue Attributes - Struct defining a queue. They are set on “create_queue” and “set_queue_attributes”
- Rsmq
Sync
Enums§
- Rsmq
Error - This is the error type for any oprtation with this
library. It derives
ThisError
Traits§
Type Aliases§
- Rsmq
Result - This is an alias of
Result<T, RsmqError>
for simplicity