Crate mqb

Source
Expand description

§mqb

This library provides lock free in memory message queue broker.

Similar to mpmc channels with a single queue size. There are two variants of broker: bounded and unbounded. The bounded variant has a limit on the number of messages that the broker can store, and if this limit is reached, trying to send another message will wait until a message is received from the subscriber, the limit is global for all “sub-channels”. An unbounded channel has an infinite capacity, so the send method will always complete immediately.

§Unbounded example

use mqb::MessageQueueBroker;
use futures::future::FutureExt;

type Tag = i32;
type Message = i32;

#[tokio::main]
async fn main() {
    let mqb = MessageQueueBroker::<Tag, Message>::unbounded();
    let sub1 = mqb.subscribe(1);
    let sub2 = mqb.subscribe(2);
    let sub2_another = mqb.subscribe(2);
    let sub2_clone = sub2.clone();

    let mut tasks = Vec::new();
    tasks.push(async move { assert_eq!(sub1.recv().await.unwrap(), 1) }.boxed());
    tasks.push(async move { assert!(sub2.recv().await.is_ok()) }.boxed());
    tasks.push(async move { assert!(sub2_another.recv().await.is_ok()) }.boxed());
    tasks.push(async move { assert!(sub2_clone.recv().await.is_ok()) }.boxed());

    mqb.send(&1, 1).await.unwrap();
    mqb.send(&2, 1).await.unwrap();
    mqb.send(&2, 2).await.unwrap();
    mqb.send(&2, 3).await.unwrap();
    assert!(mqb.send(&3, 1).await.is_err());

    futures::future::join_all(tasks).await;
}

§Bounded example

use mqb::MessageQueueBroker;
use futures::future::FutureExt;

type Tag = i32;
type Message = i32;

#[tokio::main]
async fn main() {
    let mqb = MessageQueueBroker::<Tag, Message>::bounded(2);
    let sub1 = mqb.subscribe(1);
    let sub2 = mqb.subscribe(2);

    let mut tasks = Vec::new();
    tasks.push(async move { assert_eq!(sub1.recv().await.unwrap(), 1) }.boxed());
    tasks.push(async move { assert!(sub2.recv().await.is_ok()) }.boxed());

    mqb.send(&1, 1).await.unwrap();
    mqb.send(&2, 1).await.unwrap();
    assert!(mqb.try_send(&2, 2).unwrap_err().is_full());
    assert!(mqb.try_send(&3, 1).unwrap_err().is_closed());

    futures::future::join_all(tasks).await;
}

Structs§

MessageQueueBroker
Lock free in memory message queue broker for sending values between asynchronous tasks by tags.
RecvError
SendError
Subscriber
Subscriber to the tagged queue created by subscribe() function.

Enums§

TryRecvError
TrySendError