use crate::backoff::*;
use crate::flavor::{Flavor, FlavorBounded, FlavorImpl, FlavorNew, FlavorWrap};
use crate::shared::{check_timeout, ChannelShared};
use crate::waker::WakerState;
use crate::waker_registry::{RegistrySend, SelectWaker, SelectWakerWrapper};
use crate::BlockingRxTrait;
use crate::SenderType;
use crate::{RecvError, RecvTimeoutError, TryRecvError};
use std::cell::Cell;
use std::fmt;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
pub const DEFAULT_WEIGHT: u32 = 128;
pub type Mux<F> = FlavorWrap<F, <F as Flavor>::Send, SelectWakerWrapper>;
pub struct Multiplex<F: Flavor> {
waker: Arc<SelectWaker>,
handlers: Vec<MultiplexHandle<F>>,
last_idx: Cell<usize>,
count: Cell<u32>,
}
unsafe impl<F: Flavor> Send for Multiplex<F> {}
struct MultiplexHandle<F: Flavor> {
shared: Arc<ChannelShared<Mux<F>>>,
weight: u32,
}
impl<F: Flavor> Multiplex<F> {
pub fn new() -> Self {
Self {
waker: Arc::new(SelectWaker::new()),
handlers: Vec::with_capacity(10),
count: Cell::new(0),
last_idx: Cell::new(0),
}
}
#[inline]
fn _add_item(&mut self, flavor: F, weight: u32) -> Arc<ChannelShared<Mux<F>>> {
self.waker.add_opened();
let recvs = self.waker.clone().to_wrapper(self.handlers.len());
let shared = ChannelShared::new(Mux::<F>::from_inner(flavor), F::Send::new(), recvs);
self.handlers.push(MultiplexHandle { shared: shared.clone(), weight: weight - 1 });
self.last_idx.set(self.handlers.len() - 1);
shared
}
pub fn new_tx<S>(&mut self) -> S
where
F: FlavorNew,
S: SenderType<Flavor = Mux<F>>,
{
let shared = self._add_item(F::new(), DEFAULT_WEIGHT);
S::new(shared)
}
pub fn new_tx_with_weight<S>(&mut self, weight: u32) -> S
where
F: FlavorNew,
S: SenderType<Flavor = Mux<F>>,
{
let shared = self._add_item(F::new(), weight);
S::new(shared)
}
pub fn bounded_tx<S>(&mut self, size: usize) -> S
where
F: FlavorBounded,
S: SenderType<Flavor = Mux<F>>,
{
let shared = self._add_item(F::new_with_bound(size), DEFAULT_WEIGHT);
S::new(shared)
}
pub fn bounded_tx_with_weight<S>(&mut self, size: usize, weight: u32) -> S
where
F: FlavorBounded,
S: SenderType<Flavor = Mux<F>>,
{
let shared = self._add_item(F::new_with_bound(size), weight);
S::new(shared)
}
#[inline]
pub fn try_recv(&self) -> Result<F::Item, TryRecvError> {
let last_idx = self.last_idx.get();
if let Some(item) = self._try_select_all::<true>(last_idx, self.handlers.len()) {
return Ok(item);
}
if self.waker.get_opened_count() == 0 {
return Err(TryRecvError::Disconnected);
}
Err(TryRecvError::Empty)
}
#[inline]
pub fn recv(&self) -> Result<F::Item, RecvError> {
match self._recv_blocking(None) {
Ok(item) => Ok(item),
Err(_) => Err(RecvError),
}
}
#[inline]
pub fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => match self._recv_blocking(Some(deadline)) {
Ok(item) => Ok(item),
Err(true) => Err(RecvTimeoutError::Disconnected),
Err(false) => Err(RecvTimeoutError::Timeout),
},
None => self.try_recv().map_err(|e| match e {
TryRecvError::Disconnected => RecvTimeoutError::Disconnected,
TryRecvError::Empty => RecvTimeoutError::Timeout,
}),
}
}
#[inline(always)]
fn _try_select_cached<const FINAL: bool>(&self) -> Result<F::Item, usize> {
let last_idx = self.last_idx.get();
let handle = unsafe { self.handlers.get_unchecked(last_idx) };
let count = self.count.get();
let loop_count = if count > 0 {
if let Some(msg) = handle.shared.inner.try_recv_cached() {
handle.shared.on_recv();
self.count.set(count - 1);
return Ok(msg);
}
self.handlers.len() - 1
} else {
self.handlers.len()
};
if let Some(item) = self._try_select_all::<FINAL>(last_idx, loop_count) {
return Ok(item);
}
Err(last_idx)
}
#[inline(always)]
fn _try_select_all<const FINAL: bool>(
&self, mut idx: usize, loop_count: usize,
) -> Option<F::Item> {
let len = self.handlers.len();
for _ in 0..loop_count {
idx = if idx + 1 >= len { 0 } else { idx + 1 };
let handle = unsafe { self.handlers.get_unchecked(idx) };
if let Some(msg) = if FINAL {
handle.shared.inner.try_recv_final()
} else {
handle.shared.inner.try_recv()
} {
handle.shared.on_recv();
self.count.set(handle.weight);
self.last_idx.set(idx);
return Some(msg);
}
}
None
}
#[inline]
fn _recv_blocking(&self, deadline: Option<Instant>) -> Result<F::Item, bool> {
let mut start_idx;
match self._try_select_cached::<false>() {
Ok(item) => return Ok(item),
Err(idx) => {
start_idx = idx;
}
}
let mut backoff = Backoff::from(BackoffConfig::detect());
backoff.snooze();
let len = self.handlers.len();
loop {
loop {
if let Some(item) = self._try_select_all::<false>(start_idx, len) {
return Ok(item);
}
if backoff.snooze() {
break;
}
}
self.waker.init_blocking();
let closing = self.waker.get_opened_count() == 0;
if let Some(item) = self._try_select_all::<true>(start_idx, len) {
return Ok(item);
}
if closing {
return Err(true);
}
let mut state = WakerState::Init as u8;
while state < WakerState::Woken as u8 {
match check_timeout(deadline) {
Ok(None) => {
thread::park();
}
Ok(Some(dur)) => {
thread::park_timeout(dur);
}
Err(_) => {
return Err(false);
}
}
state = self.waker.get_waker_state(Ordering::SeqCst);
}
backoff.reset();
start_idx = self.waker.get_hint();
}
}
}
impl<F: Flavor> Drop for Multiplex<F> {
#[inline]
fn drop(&mut self) {
for handle in &self.handlers {
handle.shared.close_rx();
}
}
}
impl<F: Flavor> fmt::Debug for Multiplex<F> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Multiplex<{}>", std::any::type_name::<F>())
}
}
impl<F: Flavor> fmt::Display for Multiplex<F> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self, f)
}
}
impl<F: Flavor> BlockingRxTrait<F::Item> for Multiplex<F>
where
F::Item: Send + 'static,
{
#[inline(always)]
fn recv(&self) -> Result<F::Item, RecvError> {
Self::recv(self)
}
#[inline(always)]
fn try_recv(&self) -> Result<F::Item, TryRecvError> {
Self::try_recv(self)
}
#[inline(always)]
fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError> {
Self::recv_timeout(self, timeout)
}
#[inline(always)]
fn len(&self) -> usize {
0
}
#[inline(always)]
fn capacity(&self) -> Option<usize> {
None
}
#[inline(always)]
fn is_empty(&self) -> bool {
for handle in &self.handlers {
if !handle.shared.is_empty() {
return false;
}
}
true
}
#[inline(always)]
fn is_full(&self) -> bool {
false
}
#[inline(always)]
fn is_disconnected(&self) -> bool {
self.get_tx_count() == 0
}
#[inline(always)]
fn get_tx_count(&self) -> usize {
self.waker.get_opened_count()
}
#[inline(always)]
fn get_rx_count(&self) -> usize {
1
}
fn get_wakers_count(&self) -> (usize, usize) {
(0, 0)
}
fn clone_to_vec(self, _count: usize) -> Vec<Self> {
unimplemented!();
}
}