Crate trotcast

Crate trotcast 

Source
Expand description

A multi-producer, multi-consumer broadcast channel implementation.

This crate provides a broadcast channel where multiple channels can send messages and multiple receivers will each receive a copy of every message sent.

The no_std version uses a spin::Mutex.

§Overview

There are just two structures you need to consider:

§Channel

A channel handle for the broadcast channel that allows sending messages to all receivers.

You can clone channels. If you need another Receiver, you can call Receiver::spawn_rx.

Channels will lock a RwLock when writing. I tried to not do this. If this behavior is undesirable, and you are aware of a better solution, please let me know!

§Receiver

A receiver handle for the broadcast channel that allows consuming messages.

Note: a channel should always have one active receiver. If you do not read from all receivers, then your channel will be blocked on the non-reading receiver.

However, other receivers will be able to receiver prior messages until reaching the state of the non-reading receiver.

You can clone receivers. If you need another Channel, you can call Receiver::clone_channel.

Recievers will not lock any Mutex or RwLock.

§Example

use trotcast::prelude::*;
// Create a broadcast channel with a capacity of 2
let tx = Channel::new(2);

// Clone the channel, and create receivers for multiple producers/consumers
let tx2 = tx.clone();
let mut rx1 = tx.spawn_rx();
let mut rx2 = rx1.clone();

std::thread::spawn(move || {
    tx.blocking_send(1).unwrap();
    tx.blocking_send(2).unwrap();
});

std::thread::spawn(move || {
    tx2.blocking_send(3).unwrap();
    tx2.blocking_send(4).unwrap();
});

// Using crossbeam to validate the example
let (tx_vals, receiver_vals) = crossbeam_channel::unbounded();

let recv_1 = std::thread::spawn({
    let tx = tx_vals.clone();
    move || {
        let mut count = 0;
        // Collect messages from first receiver
        while let Ok(msg) = rx1.recv() {
            tx.send((1, msg)).unwrap();
            count += 1;
            if count == 4 {
                break;
            }
        }
    }
});

let recv_2 = std::thread::spawn({
    let tx = tx_vals.clone();
    move || {
        let mut count = 0;
        // Collect messages from first receiver
        while let Ok(msg) = rx2.recv() {
            tx.send((2, msg)).unwrap();
            count += 1;
            if count == 4 {
                break;
            }
        }
    }
});
loop {
    if [&recv_1, &recv_2].iter().all(|handle| handle.is_finished()) {
        break;
    }
}
let mut messages1 = vec![];
let mut messages2 = vec![];
while let Ok((thread_no, val)) = receiver_vals.try_recv() {
    if thread_no == 1 {
        messages1.push(val)
    } else {
        messages2.push(val)
    }
}
// Both receivers got all messages
assert_eq!(messages1.len(), 4);
assert_eq!(messages2.len(), 4);

for i in [1, 2, 3, 4] {
    assert!(messages1.contains(&i));
    assert!(messages2.contains(&i));
}

Modules§

error
Error types
prelude

Structs§

Channel
A channel handle for the broadcast channel that allows sending messages to all receivers.
Receiver
A receiver handle for the broadcast channel that allows for consuming messages.