pub struct Multiplex<F: Flavor> { /* private fields */ }Expand description
A multiplexer that owns multi channel receivers of the same Flavor type.
Unlike select, it focus on round-robin mode, allow to specified weight on each channel.
It maintains a count of message received for each channel.
That means if the last message recv on the idx channel, it will keep trying the same channel
until the number equals to weight has been received. If the channel is empty, it will try the
next one without touching the count. This strategy improves the hit rate of cpu cache and ensures no starvation.
NOTE: The default weight is 128. (When the weight of all channel set to 1, the performance is the worst because of cpu cache thrashing)
§Capability and limitation:
- New channel may be added on the fly
- This abstraction is only designed for stable channels for most efficient select.
- If channel close by sender, the receiver will be automatically close inside the Multiplex, user will not be notify until all its channels closed.
- Due to it binds on Flavor interface, it cannot be use between different type. If you want to multiplex between list and array, can use the CompatFlavor
- NOTE : It has internal mutability because it need to impl BlockingRxTrait,
the adding channel process remains
&mut self. BecauseMultiplexis a single consumer just like Rx, it does not haveSync. If you can guarantee no concurrent access you can manutally add theSyncback in parent struct.
§Examples
Basic usage with multiple senders:
use crossfire::{mpsc::Array, MTx, select::{Multiplex, Mux}};
use std::thread;
// Create a multiplexer with Array flavor
let mut mp = Multiplex::<Array<i32>>::new();
// Create multiple senders through the multiplexer
let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
// Send values from different threads
let h1 = thread::spawn(move || {
tx1.send(1).unwrap();
});
let h2 = thread::spawn(move || {
tx2.send(2).unwrap();
});
// Receive values through the multiplexer (order may vary)
let val1 = mp.recv().unwrap();
let val2 = mp.recv().unwrap();
h1.join().unwrap();
h2.join().unwrap();Implementations§
Source§impl<F: Flavor> Multiplex<F>
impl<F: Flavor> Multiplex<F>
Sourcepub fn new_tx<S>(&mut self) -> S
pub fn new_tx<S>(&mut self) -> S
Add a new channels with a new() method to multiplex, return its sender.
§Type Parameters
S- The sender type that implements SenderType with the appropriate Flavor, may be async or blocking sender, MP or SP that match theFlavortype.
§Note
This method is only available for flavors that implement FlavorNew trait,
such as List / One flavor. For flavors like Array that don’t implement FlavorNew,
use bounded_tx instead.
§Example
with mpsc::List (which sender type is MTx and allow to clone)
use crossfire::{mpsc::List, MTx, select::{Multiplex, Mux}};
use tokio;
let mut mp = Multiplex::<List<i32>>::new();
let tx1: MTx<Mux<List<i32>>> = mp.new_tx();
let tx2: MTx<Mux<List<i32>>> = mp.new_tx();
tx1.send(42).expect("send");
tx2.send(42).expect("send");
let value = mp.recv().unwrap();
assert_eq!(value, 42);
let value = mp.recv().unwrap();
assert_eq!(value, 42);with spsc::One (which sender type is Tx and not cloneable)
use crossfire::{spsc::One, Tx, select::{Multiplex, Mux}};
use tokio;
let mut mp = Multiplex::<One<i32>>::new();
// Creates an size-1 channel
let tx1: Tx<Mux<One<i32>>> = mp.new_tx();
// Creates another size-1 channel
let tx2: Tx<Mux<One<i32>>> = mp.new_tx();
std::thread::spawn(move ||{
tx2.send(42).expect("send");
});
let value = mp.recv().unwrap();
assert_eq!(value, 42);Sourcepub fn new_tx_with_weight<S>(&mut self, weight: u32) -> S
pub fn new_tx_with_weight<S>(&mut self, weight: u32) -> S
Add a channel of flavor (impl FlavorNew), with custom weight instead of default (the default weight is 128)
Sourcepub fn bounded_tx<S>(&mut self, size: usize) -> S
pub fn bounded_tx<S>(&mut self, size: usize) -> S
Creates a new bounded sender for the multiplexer
§Arguments
size- The maximum capacity of the channel
§Type Parameters
S- The sender type that implements SenderType with the appropriate Flavor
§Example
use crossfire::{mpsc::Array, *, select::{Multiplex, Mux}};
let mut mp = Multiplex::<Array<i32>>::new();
// Creates a bounded channel with capacity 10
let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
// Creates another bounded channel with capacity 20
let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(20);
tx1.send(42).expect("send");
std::thread::spawn(move || {
tx2.send(42).expect("send");
});
let value = mp.recv().unwrap();
assert_eq!(value, 42);
let value = mp.recv().unwrap();
assert_eq!(value, 42);Sourcepub fn bounded_tx_with_weight<S>(&mut self, size: usize, weight: u32) -> S
pub fn bounded_tx_with_weight<S>(&mut self, size: usize, weight: u32) -> S
Add a bounded channel to the multiplex, with custom weight (the default is 128)
Sourcepub fn try_recv(&self) -> Result<F::Item, TryRecvError>
pub fn try_recv(&self) -> Result<F::Item, TryRecvError>
Attempts to receive a message from any of the multiplexed channels without blocking.
Returns Ok(item) if a message is available on any of the channels.
Returns Err(TryRecvError::Empty) if no messages are available.
Returns Err(TryRecvError::Disconnected) if all senders have been dropped.
§Example
use crossfire::{mpsc::Array, select::{Multiplex, Mux}, MTx, TryRecvError};
let mut mp = Multiplex::<Array<i32>>::new();
let tx1: MTx<Mux<_>> = mp.bounded_tx(10);
let _tx2: MTx<Mux<_>> = mp.bounded_tx(10);
// No message available yet
assert_eq!(mp.try_recv(), Err(TryRecvError::Empty));
tx1.send(42).unwrap();
// Now a message is available
assert_eq!(mp.try_recv(), Ok(42));Sourcepub fn recv(&self) -> Result<F::Item, RecvError>
pub fn recv(&self) -> Result<F::Item, RecvError>
Receives a message from any of the multiplexed channels, blocking if necessary.
This method will block the current thread until a message is available on any of the channels, or until all senders are dropped.
Sourcepub fn recv_timeout(
&self,
timeout: Duration,
) -> Result<F::Item, RecvTimeoutError>
pub fn recv_timeout( &self, timeout: Duration, ) -> Result<F::Item, RecvTimeoutError>
Receives a message from any of the multiplexed channels with a timeout. Will block when channel is empty.
The behavior is atomic: the message is either received successfully or the operation is canceled due to a timeout.
Returns Ok(T) when successful.
Returns Err(RecvTimeoutError::Timeout) when a message could not be received because the channel is empty and the operation timed out.
Returns Err(RecvTimeoutError::Disconnected) if the sender has been dropped and the channel is empty.
Trait Implementations§
Source§impl<F: Flavor> BlockingRxTrait<<F as Queue>::Item> for Multiplex<F>
impl<F: Flavor> BlockingRxTrait<<F as Queue>::Item> for Multiplex<F>
Source§fn is_disconnected(&self) -> bool
fn is_disconnected(&self) -> bool
Return true if all sender has been close
Source§fn get_tx_count(&self) -> usize
fn get_tx_count(&self) -> usize
NOTE: it does not count all the clones to the senders
Source§fn get_rx_count(&self) -> usize
fn get_rx_count(&self) -> usize
This is single consumer