Crate mqb

Crate mqb 

Source
Expand description

§mqb

This library provides lock free in memory message queue broker.

Similar to mpmc channels with a single queue size. There are two variants of broker: bounded and unbounded. The bounded variant has a limit on the number of messages that the broker can store, and if this limit is reached, trying to send another message will wait until a message is received from the subscriber, the limit is global for all “sub-channels”. An unbounded channel has an infinite capacity, so the send method will always complete immediately.

§Unbounded example

use mqb::MessageQueueBroker;
use futures::future::FutureExt;

type Tag = i32;
type Message = i32;

#[tokio::main]
async fn main() {
    let mqb = MessageQueueBroker::<Tag, Message>::unbounded();
    let sub1 = mqb.subscribe(1);
    let sub2 = mqb.subscribe(2);
    let sub2_another = mqb.subscribe(2);
    let sub2_clone = sub2.clone();

    let mut tasks = Vec::new();
    tasks.push(async move { assert_eq!(sub1.recv().await.unwrap(), 1) }.boxed());
    tasks.push(async move { assert!(sub2.recv().await.is_ok()) }.boxed());
    tasks.push(async move { assert!(sub2_another.recv().await.is_ok()) }.boxed());
    tasks.push(async move { assert!(sub2_clone.recv().await.is_ok()) }.boxed());

    mqb.send(&1, 1).await.unwrap();
    mqb.send(&2, 1).await.unwrap();
    mqb.send(&2, 2).await.unwrap();
    mqb.send(&2, 3).await.unwrap();
    assert!(mqb.send(&3, 1).await.is_err());

    futures::future::join_all(tasks).await;
}

§Bounded example

use mqb::MessageQueueBroker;
use futures::future::FutureExt;

type Tag = i32;
type Message = i32;

#[tokio::main]
async fn main() {
    let mqb = MessageQueueBroker::<Tag, Message>::bounded(2);
    let sub1 = mqb.subscribe(1);
    let sub2 = mqb.subscribe(2);

    let mut tasks = Vec::new();
    tasks.push(async move { assert_eq!(sub1.recv().await.unwrap(), 1) }.boxed());
    tasks.push(async move { assert!(sub2.recv().await.is_ok()) }.boxed());

    mqb.send(&1, 1).await.unwrap();
    mqb.send(&2, 1).await.unwrap();
    assert!(mqb.try_send(&2, 2).unwrap_err().is_full());
    assert!(mqb.try_send(&3, 1).unwrap_err().is_closed());

    futures::future::join_all(tasks).await;
}

Structs§

MessageQueueBroker
Lock free in memory message queue broker for sending values between asynchronous tasks by tags.
Recv
A future returned by Subscriber::recv().
RecvError
An error returned from Subscriber::recv().
Send
A future returned by MessageQueueBroker::send().
SendError
An error returned from MessageQueueBroker::send().
Subscriber
Subscriber to the tagged queue created by MessageQueueBroker::subscribe() function.

Enums§

TryRecvError
An error returned from Subscriber::try_recv().
TrySendError
An error returned from MessageQueueBroker::try_send().