use super::SelectMode;
use crate::backoff::*;
use crate::flavor::Token;
use crate::shared::{check_timeout, ChannelShared};
use crate::trace_log;
use crate::waker::WakerState;
use crate::waker_registry::SelectWaker;
use crate::ReceiverType;
use crate::{RecvError, RecvTimeoutError, TryRecvError};
use smallvec::SmallVec;
use std::collections::hash_map::DefaultHasher;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::ops::Add;
use std::sync::{atomic::Ordering, Arc};
use std::thread;
use std::time::{Duration, Instant};
pub struct Select<'a> {
handlers: SmallVec<[RecvHandle<'a>; 32]>,
waker: Arc<SelectWaker>,
mode: SelectMode,
next_index: usize,
rng: u64,
}
impl<'a> Select<'a> {
pub fn new() -> Self {
Self::new_with(SelectMode::RR)
}
#[inline]
pub fn new_random() -> Self {
Self::new_with(SelectMode::Rand)
}
#[inline]
pub fn new_bias() -> Self {
Self::new_with(SelectMode::Bias)
}
#[inline]
pub fn new_with(mode: SelectMode) -> Self {
let rng = if let SelectMode::Rand = mode {
let mut hasher = DefaultHasher::new();
Instant::now().hash(&mut hasher);
thread::current().id().hash(&mut hasher);
hasher.finish()
} else {
0
};
Self {
mode,
handlers: SmallVec::new(),
waker: Arc::new(SelectWaker::new()),
next_index: 0,
rng,
}
}
#[inline]
pub fn add<R: ReceiverType>(&mut self, recv: &'a R)
where
ChannelShared<R::Flavor>: SelectHandle,
{
let shared: &ChannelShared<R::Flavor> = recv.as_ref();
self.handlers.push(RecvHandle {
registered: false,
shared: shared as &dyn SelectHandle,
channel: recv as *const R as *const u8,
});
}
pub fn remove<R: ReceiverType>(&mut self, recv: &R) {
let channel = recv as *const R as *const u8;
if let Some(index) = self.handlers.iter().position(|h| h.channel == channel) {
self.handlers[index].shared.cancel_waker(&self.waker);
self.handlers.remove(index);
if !self.handlers.is_empty() {
if self.next_index >= self.handlers.len() {
self.next_index = 0;
}
for handler in &mut self.handlers {
handler.registered = false;
handler.shared.cancel_waker(&self.waker);
}
}
}
}
pub fn try_select(&mut self) -> Result<SelectResult, TryRecvError> {
if self.handlers.is_empty() {
return Err(TryRecvError::Disconnected);
}
let idx = self._try_select_begin();
if let Some(res) = self._try_select(idx, true) {
return Ok(res);
}
Err(TryRecvError::Empty)
}
#[inline(always)]
fn _try_select(&mut self, mut idx: usize, final_check: bool) -> Option<SelectResult> {
let len = self.handlers.len();
debug_assert!(len > 0);
for _ in 0..len {
if idx >= len {
idx = 0;
}
if let Ok(res) = self.handlers[idx].try_select(final_check) {
trace_log!("select ok idx={}", idx);
if self.mode == SelectMode::RR {
self.next_index = idx + 1;
}
return Some(res);
} else if final_check {
trace_log!("select: final_check {}", idx);
}
idx += 1;
}
None
}
#[inline(always)]
fn _try_select_begin(&mut self) -> usize {
match self.mode {
SelectMode::Bias => 0,
SelectMode::RR => {
if self.next_index >= self.handlers.len() {
0
} else {
self.next_index
}
}
SelectMode::Rand => {
let mut x = self.rng;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
self.rng = x;
(x as usize) % self.handlers.len()
}
}
}
pub fn select(&mut self) -> Result<SelectResult, RecvError> {
match self._select_blocking(None) {
Ok(res) => Ok(res),
Err(true) => Err(RecvError),
_ => unreachable!(),
}
}
pub fn select_timeout(&mut self, timeout: Duration) -> Result<SelectResult, RecvTimeoutError> {
let deadline = Instant::now().add(timeout);
match self._select_blocking(Some(deadline)) {
Ok(res) => Ok(res),
Err(true) => Err(RecvTimeoutError::Disconnected),
Err(false) => Err(RecvTimeoutError::Timeout),
}
}
#[inline(always)]
fn _select_blocking(&mut self, deadline: Option<Instant>) -> Result<SelectResult, bool> {
if self.handlers.is_empty() {
return Err(true); }
let mut idx = self._try_select_begin();
if let Some(res) = self._try_select(idx, false) {
return Ok(res);
}
let mut backoff = Backoff::from(BackoffConfig::detect());
backoff.snooze();
loop {
loop {
if let Some(res) = self._try_select(idx, false) {
return Ok(res);
}
if backoff.snooze() {
break;
}
}
self.waker.init_blocking();
for (i, handler) in self.handlers.iter_mut().enumerate() {
handler.reg_waker(i, &self.waker);
}
if let Some(res) = self._try_select(idx, true) {
return Ok(res);
}
trace_log!("select: park");
let mut state = WakerState::Init as u8;
while state < WakerState::Woken as u8 {
match check_timeout(deadline) {
Ok(None) => {
std::thread::park();
}
Ok(Some(dur)) => {
std::thread::park_timeout(dur);
}
Err(_) => {
return Err(false);
}
}
state = self.waker.get_waker_state(Ordering::SeqCst);
trace_log!("select: unpark state={}", state);
}
idx = self.waker.get_hint();
trace_log!("select: hint idx {}", idx);
}
}
}
impl<'a> Drop for Select<'a> {
#[inline(always)]
fn drop(&mut self) {
for handler in &self.handlers {
handler.shared.cancel_waker(&self.waker);
}
}
}
impl<'a> std::fmt::Debug for Select<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Select")
}
}
struct RecvHandle<'a> {
shared: &'a dyn SelectHandle,
registered: bool,
channel: *const u8,
}
impl<'a> RecvHandle<'a> {
#[inline(always)]
fn try_select(&self, final_check: bool) -> Result<SelectResult, ()> {
if let Some(token) = self.shared.try_select(final_check) {
return Ok(SelectResult { channel: self.channel, token });
}
Err(())
}
#[inline(always)]
fn reg_waker(&mut self, index: usize, global_waker: &Arc<SelectWaker>) {
if self.registered {
return;
}
if self.shared.reg_waker(index, global_waker) {
trace_log!("select: reg waker");
self.registered = true;
}
}
}
pub struct SelectResult {
pub(crate) channel: *const u8,
pub(crate) token: Token,
}
impl fmt::Debug for SelectResult {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SelectResult(from {:p})", self.channel)
}
}
impl SelectResult {
#[inline]
pub fn is_from<R: ReceiverType>(&self, rx: &R) -> bool {
self.channel == rx as *const R as *const u8
}
}
impl<R: ReceiverType> PartialEq<R> for SelectResult {
#[inline]
fn eq(&self, other: &R) -> bool {
self.is_from(other)
}
}
#[allow(private_bounds)]
pub(crate) trait SelectHandle: Send {
fn try_select(&self, final_check: bool) -> Option<Token>;
fn reg_waker(&self, channel_id: usize, waker: &Arc<SelectWaker>) -> bool;
fn cancel_waker(&self, waker: &Arc<SelectWaker>);
}