use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::time::{Duration, Instant};
use std::vec::Vec;
use crossbeam_utils::Backoff;
use crate::channel::{self, Receiver, Sender};
use crate::context::Context;
use crate::err::{ReadyTimeoutError, TryReadyError};
use crate::err::{RecvError, SendError};
use crate::err::{SelectTimeoutError, TrySelectError};
use crate::flavors;
use crate::utils;
#[derive(Debug, Default)]
pub struct Token {
pub(crate) at: flavors::at::AtToken,
pub(crate) array: flavors::array::ArrayToken,
pub(crate) list: flavors::list::ListToken,
#[allow(dead_code)]
pub(crate) never: flavors::never::NeverToken,
pub(crate) tick: flavors::tick::TickToken,
pub(crate) zero: flavors::zero::ZeroToken,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Operation(usize);
impl Operation {
#[inline]
pub fn hook<T>(r: &mut T) -> Operation {
let val = r as *mut T as usize;
assert!(val > 2);
Operation(val)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Selected {
Waiting,
Aborted,
Disconnected,
Operation(Operation),
}
impl From<usize> for Selected {
#[inline]
fn from(val: usize) -> Selected {
match val {
0 => Selected::Waiting,
1 => Selected::Aborted,
2 => Selected::Disconnected,
oper => Selected::Operation(Operation(oper)),
}
}
}
impl Into<usize> for Selected {
#[inline]
fn into(self) -> usize {
match self {
Selected::Waiting => 0,
Selected::Aborted => 1,
Selected::Disconnected => 2,
Selected::Operation(Operation(val)) => val,
}
}
}
pub trait SelectHandle {
fn try_select(&self, token: &mut Token) -> bool;
fn deadline(&self) -> Option<Instant>;
fn register(&self, oper: Operation, cx: &Context) -> bool;
fn unregister(&self, oper: Operation);
fn accept(&self, token: &mut Token, cx: &Context) -> bool;
fn is_ready(&self) -> bool;
fn watch(&self, oper: Operation, cx: &Context) -> bool;
fn unwatch(&self, oper: Operation);
}
impl<T: SelectHandle> SelectHandle for &T {
fn try_select(&self, token: &mut Token) -> bool {
(**self).try_select(token)
}
fn deadline(&self) -> Option<Instant> {
(**self).deadline()
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
(**self).register(oper, cx)
}
fn unregister(&self, oper: Operation) {
(**self).unregister(oper);
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
(**self).accept(token, cx)
}
fn is_ready(&self) -> bool {
(**self).is_ready()
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
(**self).watch(oper, cx)
}
fn unwatch(&self, oper: Operation) {
(**self).unwatch(oper)
}
}
#[derive(Clone, Copy, Eq, PartialEq)]
enum Timeout {
Now,
Never,
At(Instant),
}
fn run_select(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<(Token, usize, *const u8)> {
if handles.is_empty() {
match timeout {
Timeout::Now => return None,
Timeout::Never => {
utils::sleep_until(None);
unreachable!();
}
Timeout::At(when) => {
utils::sleep_until(Some(when));
return None;
}
}
}
if !is_biased {
utils::shuffle(handles);
}
let mut token = Token::default();
for &(handle, i, ptr) in handles.iter() {
if handle.try_select(&mut token) {
return Some((token, i, ptr));
}
}
loop {
let res = Context::with(|cx| {
let mut sel = Selected::Waiting;
let mut registered_count = 0;
let mut index_ready = None;
if let Timeout::Now = timeout {
cx.try_select(Selected::Aborted).unwrap();
}
for (handle, i, _) in handles.iter_mut() {
registered_count += 1;
if handle.register(Operation::hook::<&dyn SelectHandle>(handle), cx) {
sel = match cx.try_select(Selected::Aborted) {
Ok(()) => {
index_ready = Some(*i);
Selected::Aborted
}
Err(s) => s,
};
break;
}
sel = cx.selected();
if sel != Selected::Waiting {
break;
}
}
if sel == Selected::Waiting {
let mut deadline: Option<Instant> = match timeout {
Timeout::Now => return None,
Timeout::Never => None,
Timeout::At(when) => Some(when),
};
for &(handle, _, _) in handles.iter() {
if let Some(x) = handle.deadline() {
deadline = deadline.map(|y| x.min(y)).or(Some(x));
}
}
sel = cx.wait_until(deadline);
}
for (handle, _, _) in handles.iter_mut().take(registered_count) {
handle.unregister(Operation::hook::<&dyn SelectHandle>(handle));
}
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
if let Some(index_ready) = index_ready {
for &(handle, i, ptr) in handles.iter() {
if i == index_ready && handle.try_select(&mut token) {
return Some((i, ptr));
}
}
}
}
Selected::Disconnected => {}
Selected::Operation(_) => {
for (handle, i, ptr) in handles.iter_mut() {
if sel == Selected::Operation(Operation::hook::<&dyn SelectHandle>(handle))
{
if handle.accept(&mut token, cx) {
return Some((*i, *ptr));
}
}
}
}
}
None
});
if let Some((i, ptr)) = res {
return Some((token, i, ptr));
}
for &(handle, i, ptr) in handles.iter() {
if handle.try_select(&mut token) {
return Some((token, i, ptr));
}
}
match timeout {
Timeout::Now => return None,
Timeout::Never => {}
Timeout::At(when) => {
if Instant::now() >= when {
return None;
}
}
}
}
}
fn run_ready(
handles: &mut [(&dyn SelectHandle, usize, *const u8)],
timeout: Timeout,
is_biased: bool,
) -> Option<usize> {
if handles.is_empty() {
match timeout {
Timeout::Now => return None,
Timeout::Never => {
utils::sleep_until(None);
unreachable!();
}
Timeout::At(when) => {
utils::sleep_until(Some(when));
return None;
}
}
}
if !is_biased {
utils::shuffle(handles);
}
loop {
let backoff = Backoff::new();
loop {
for &(handle, i, _) in handles.iter() {
if handle.is_ready() {
return Some(i);
}
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
match timeout {
Timeout::Now => return None,
Timeout::Never => {}
Timeout::At(when) => {
if Instant::now() >= when {
return None;
}
}
}
let res = Context::with(|cx| {
let mut sel = Selected::Waiting;
let mut registered_count = 0;
for (handle, _, _) in handles.iter_mut() {
registered_count += 1;
let oper = Operation::hook::<&dyn SelectHandle>(handle);
if handle.watch(oper, cx) {
sel = match cx.try_select(Selected::Operation(oper)) {
Ok(()) => Selected::Operation(oper),
Err(s) => s,
};
break;
}
sel = cx.selected();
if sel != Selected::Waiting {
break;
}
}
if sel == Selected::Waiting {
let mut deadline: Option<Instant> = match timeout {
Timeout::Now => unreachable!(),
Timeout::Never => None,
Timeout::At(when) => Some(when),
};
for &(handle, _, _) in handles.iter() {
if let Some(x) = handle.deadline() {
deadline = deadline.map(|y| x.min(y)).or(Some(x));
}
}
sel = cx.wait_until(deadline);
}
for (handle, _, _) in handles.iter_mut().take(registered_count) {
handle.unwatch(Operation::hook::<&dyn SelectHandle>(handle));
}
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {}
Selected::Disconnected => {}
Selected::Operation(_) => {
for (handle, i, _) in handles.iter_mut() {
let oper = Operation::hook::<&dyn SelectHandle>(handle);
if sel == Selected::Operation(oper) {
return Some(*i);
}
}
}
}
None
});
if res.is_some() {
return res;
}
}
}
#[inline]
pub fn try_select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
is_biased: bool,
) -> Result<SelectedOperation<'a>, TrySelectError> {
match run_select(handles, Timeout::Now, is_biased) {
None => Err(TrySelectError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
index,
ptr,
_marker: PhantomData,
}),
}
}
#[inline]
pub fn select<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
is_biased: bool,
) -> SelectedOperation<'a> {
if handles.is_empty() {
panic!("no operations have been added to `Select`");
}
let (token, index, ptr) = run_select(handles, Timeout::Never, is_biased).unwrap();
SelectedOperation {
token,
index,
ptr,
_marker: PhantomData,
}
}
#[inline]
pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => select_deadline(handles, deadline, is_biased),
None => Ok(select(handles, is_biased)),
}
}
#[inline]
pub(crate) fn select_deadline<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
deadline: Instant,
is_biased: bool,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
match run_select(handles, Timeout::At(deadline), is_biased) {
None => Err(SelectTimeoutError),
Some((token, index, ptr)) => Ok(SelectedOperation {
token,
index,
ptr,
_marker: PhantomData,
}),
}
}
pub struct Select<'a> {
handles: Vec<(&'a dyn SelectHandle, usize, *const u8)>,
next_index: usize,
biased: bool,
}
unsafe impl Send for Select<'_> {}
unsafe impl Sync for Select<'_> {}
impl<'a> Select<'a> {
pub fn new() -> Select<'a> {
Select {
handles: Vec::with_capacity(4),
next_index: 0,
biased: false,
}
}
pub fn new_biased() -> Self {
Self {
biased: true,
..Default::default()
}
}
pub fn send<T>(&mut self, s: &'a Sender<T>) -> usize {
let i = self.next_index;
let ptr = s as *const Sender<_> as *const u8;
self.handles.push((s, i, ptr));
self.next_index += 1;
i
}
pub fn recv<T>(&mut self, r: &'a Receiver<T>) -> usize {
let i = self.next_index;
let ptr = r as *const Receiver<_> as *const u8;
self.handles.push((r, i, ptr));
self.next_index += 1;
i
}
pub fn remove(&mut self, index: usize) {
assert!(
index < self.next_index,
"index out of bounds; {} >= {}",
index,
self.next_index,
);
let i = self
.handles
.iter()
.enumerate()
.find(|(_, (_, i, _))| *i == index)
.expect("no operation with this index")
.0;
self.handles.swap_remove(i);
}
pub fn try_select(&mut self) -> Result<SelectedOperation<'a>, TrySelectError> {
try_select(&mut self.handles, self.biased)
}
pub fn select(&mut self) -> SelectedOperation<'a> {
select(&mut self.handles, self.biased)
}
pub fn select_timeout(
&mut self,
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_timeout(&mut self.handles, timeout, self.biased)
}
pub fn select_deadline(
&mut self,
deadline: Instant,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_deadline(&mut self.handles, deadline, self.biased)
}
pub fn try_ready(&mut self) -> Result<usize, TryReadyError> {
match run_ready(&mut self.handles, Timeout::Now, self.biased) {
None => Err(TryReadyError),
Some(index) => Ok(index),
}
}
pub fn ready(&mut self) -> usize {
if self.handles.is_empty() {
panic!("no operations have been added to `Select`");
}
run_ready(&mut self.handles, Timeout::Never, self.biased).unwrap()
}
pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
match Instant::now().checked_add(timeout) {
Some(deadline) => self.ready_deadline(deadline),
None => Ok(self.ready()),
}
}
pub fn ready_deadline(&mut self, deadline: Instant) -> Result<usize, ReadyTimeoutError> {
match run_ready(&mut self.handles, Timeout::At(deadline), self.biased) {
None => Err(ReadyTimeoutError),
Some(index) => Ok(index),
}
}
}
impl<'a> Clone for Select<'a> {
fn clone(&self) -> Select<'a> {
Select {
handles: self.handles.clone(),
next_index: self.next_index,
biased: self.biased,
}
}
}
impl<'a> Default for Select<'a> {
fn default() -> Select<'a> {
Select::new()
}
}
impl fmt::Debug for Select<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Select { .. }")
}
}
#[must_use]
pub struct SelectedOperation<'a> {
token: Token,
index: usize,
ptr: *const u8,
_marker: PhantomData<&'a ()>,
}
impl SelectedOperation<'_> {
pub fn index(&self) -> usize {
self.index
}
pub fn send<T>(mut self, s: &Sender<T>, msg: T) -> Result<(), SendError<T>> {
assert!(
s as *const Sender<T> as *const u8 == self.ptr,
"passed a sender that wasn't selected",
);
let res = unsafe { channel::write(s, &mut self.token, msg) };
mem::forget(self);
res.map_err(SendError)
}
pub fn recv<T>(mut self, r: &Receiver<T>) -> Result<T, RecvError> {
assert!(
r as *const Receiver<T> as *const u8 == self.ptr,
"passed a receiver that wasn't selected",
);
let res = unsafe { channel::read(r, &mut self.token) };
mem::forget(self);
res.map_err(|_| RecvError)
}
}
impl fmt::Debug for SelectedOperation<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("SelectedOperation { .. }")
}
}
impl Drop for SelectedOperation<'_> {
fn drop(&mut self) {
panic!("dropped `SelectedOperation` without completing the operation");
}
}