Multiplex

Struct Multiplex 

Source
pub struct Multiplex<F: Flavor> { /* private fields */ }
Expand description

A multiplexer that owns multi channel receivers of the same Flavor type.

Unlike select, it focus on round-robin mode, allow to specified weight on each channel. It maintains a count of message received for each channel. That means if the last message recv on the idx channel, it will keep trying the same channel until the number equals to weight has been received. If the channel is empty, it will try the next one without touching the count. This strategy improves the hit rate of cpu cache and ensures no starvation.

NOTE: The default weight is 128. (When the weight of all channel set to 1, the performance is the worst because of cpu cache thrashing)

§Capability and limitation:

  • New channel may be added on the fly
  • This abstraction is only designed for stable channels for most efficient select.
  • If channel close by sender, the receiver will be automatically close inside the Multiplex, user will not be notify until all its channels closed.
  • Due to it binds on Flavor interface, it cannot be use between different type. If you want to multiplex between list and array, can use the CompatFlavor
  • NOTE : It has internal mutability because it need to impl BlockingRxTrait, the adding channel process remains &mut self. Because Multiplex is a single consumer just like Rx, it does not have Sync. If you can guarantee no concurrent access you can manutally add the Sync back in parent struct.

§Examples

Basic usage with multiple senders:

use crossfire::{mpsc::Array, MTx, select::{Multiplex, Mux}};
use std::thread;

// Create a multiplexer with Array flavor
let mut mp = Multiplex::<Array<i32>>::new();

// Create multiple senders through the multiplexer
let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);

// Send values from different threads
let h1 = thread::spawn(move || {
    tx1.send(1).unwrap();
});
let h2 = thread::spawn(move || {
    tx2.send(2).unwrap();
});

// Receive values through the multiplexer (order may vary)
let val1 = mp.recv().unwrap();
let val2 = mp.recv().unwrap();

h1.join().unwrap();
h2.join().unwrap();

Implementations§

Source§

impl<F: Flavor> Multiplex<F>

Source

pub fn new() -> Self

Initialize Select with fair, round-robin strategy

Source

pub fn new_tx<S>(&mut self) -> S
where F: FlavorNew, S: SenderType<Flavor = Mux<F>>,

Add a new channels with a new() method to multiplex, return its sender.

§Type Parameters
  • S - The sender type that implements SenderType with the appropriate Flavor, may be async or blocking sender, MP or SP that match the Flavor type.
§Note

This method is only available for flavors that implement FlavorNew trait, such as List / One flavor. For flavors like Array that don’t implement FlavorNew, use bounded_tx instead.

§Example

with mpsc::List (which sender type is MTx and allow to clone)

use crossfire::{mpsc::List, MTx, select::{Multiplex, Mux}};
use tokio;

let mut mp = Multiplex::<List<i32>>::new();
let tx1: MTx<Mux<List<i32>>> = mp.new_tx();
let tx2: MTx<Mux<List<i32>>> = mp.new_tx();
tx1.send(42).expect("send");
tx2.send(42).expect("send");
let value = mp.recv().unwrap();
assert_eq!(value, 42);
let value = mp.recv().unwrap();
assert_eq!(value, 42);

with spsc::One (which sender type is Tx and not cloneable)

use crossfire::{spsc::One, Tx, select::{Multiplex, Mux}};
use tokio;

let mut mp = Multiplex::<One<i32>>::new();
// Creates an size-1 channel
let tx1: Tx<Mux<One<i32>>> = mp.new_tx();
// Creates another size-1 channel
let tx2: Tx<Mux<One<i32>>> = mp.new_tx();
std::thread::spawn(move ||{
    tx2.send(42).expect("send");
});
let value = mp.recv().unwrap();
assert_eq!(value, 42);
Source

pub fn new_tx_with_weight<S>(&mut self, weight: u32) -> S
where F: FlavorNew, S: SenderType<Flavor = Mux<F>>,

Add a channel of flavor (impl FlavorNew), with custom weight instead of default (the default weight is 128)

Source

pub fn bounded_tx<S>(&mut self, size: usize) -> S
where F: FlavorBounded, S: SenderType<Flavor = Mux<F>>,

Creates a new bounded sender for the multiplexer

§Arguments
  • size - The maximum capacity of the channel
§Type Parameters
  • S - The sender type that implements SenderType with the appropriate Flavor
§Example
use crossfire::{mpsc::Array, *, select::{Multiplex, Mux}};

let mut mp = Multiplex::<Array<i32>>::new();
// Creates a bounded channel with capacity 10
let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
// Creates another bounded channel with capacity 20
let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(20);
tx1.send(42).expect("send");
std::thread::spawn(move || {
    tx2.send(42).expect("send");
});
let value = mp.recv().unwrap();
assert_eq!(value, 42);
let value = mp.recv().unwrap();
assert_eq!(value, 42);
Source

pub fn bounded_tx_with_weight<S>(&mut self, size: usize, weight: u32) -> S
where F: FlavorBounded, S: SenderType<Flavor = Mux<F>>,

Add a bounded channel to the multiplex, with custom weight (the default is 128)

Source

pub fn try_recv(&self) -> Result<F::Item, TryRecvError>

Attempts to receive a message from any of the multiplexed channels without blocking.

Returns Ok(item) if a message is available on any of the channels. Returns Err(TryRecvError::Empty) if no messages are available. Returns Err(TryRecvError::Disconnected) if all senders have been dropped.

§Example
use crossfire::{mpsc::Array, select::{Multiplex, Mux}, MTx, TryRecvError};

let mut mp = Multiplex::<Array<i32>>::new();
let tx1: MTx<Mux<_>> = mp.bounded_tx(10);
let _tx2: MTx<Mux<_>> = mp.bounded_tx(10);
// No message available yet
assert_eq!(mp.try_recv(), Err(TryRecvError::Empty));
tx1.send(42).unwrap();
// Now a message is available
assert_eq!(mp.try_recv(), Ok(42));
Source

pub fn recv(&self) -> Result<F::Item, RecvError>

Receives a message from any of the multiplexed channels, blocking if necessary.

This method will block the current thread until a message is available on any of the channels, or until all senders are dropped.

Source

pub fn recv_timeout( &self, timeout: Duration, ) -> Result<F::Item, RecvTimeoutError>

Receives a message from any of the multiplexed channels with a timeout. Will block when channel is empty.

The behavior is atomic: the message is either received successfully or the operation is canceled due to a timeout.

Returns Ok(T) when successful.

Returns Err(RecvTimeoutError::Timeout) when a message could not be received because the channel is empty and the operation timed out.

Returns Err(RecvTimeoutError::Disconnected) if the sender has been dropped and the channel is empty.

Trait Implementations§

Source§

impl<F: Flavor> BlockingRxTrait<<F as Queue>::Item> for Multiplex<F>

Source§

fn len(&self) -> usize

The number of messages in the channel at the moment

Source§

fn capacity(&self) -> Option<usize>

always return None

Source§

fn is_empty(&self) -> bool

Returns true when all the channel’s empty

Source§

fn is_full(&self) -> bool

Not practical to impl

Source§

fn is_disconnected(&self) -> bool

Return true if all sender has been close

Source§

fn get_tx_count(&self) -> usize

NOTE: it does not count all the clones to the senders

Source§

fn get_rx_count(&self) -> usize

This is single consumer

Source§

fn recv(&self) -> Result<F::Item, RecvError>

Receives a message from the channel. This method will block until a message is received or the channel is closed. Read more
Source§

fn try_recv(&self) -> Result<F::Item, TryRecvError>

Attempts to receive a message from the channel without blocking. Read more
Source§

fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError>

Receives a message from the channel with a timeout. Will block when channel is empty. Read more
Source§

fn get_wakers_count(&self) -> (usize, usize)

Source§

fn clone_to_vec(self, _count: usize) -> Vec<Self>

Source§

impl<F: Flavor> Debug for Multiplex<F>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<F: Flavor> Display for Multiplex<F>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<F: Flavor> Drop for Multiplex<F>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<F: Flavor> Send for Multiplex<F>

Auto Trait Implementations§

§

impl<F> !Freeze for Multiplex<F>

§

impl<F> !RefUnwindSafe for Multiplex<F>

§

impl<F> !Sync for Multiplex<F>

§

impl<F> Unpin for Multiplex<F>

§

impl<F> !UnwindSafe for Multiplex<F>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToString for T
where T: Display + ?Sized,

Source§

fn to_string(&self) -> String

Converts the given value to a String. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.