#[allow(unused_imports)]
use crate::collections::WeakCell;
#[allow(unused_imports)]
use crate::flavor::{Flavor, FlavorImpl};
#[cfg(feature = "trace_log")]
use crate::tokio_task_id;
use crate::trace_log;
use crate::waker::*;
use parking_lot::Mutex;
use std::cell::UnsafeCell;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::sync::{
atomic::{compiler_fence, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
};
use std::task::{Context, Poll};
pub(crate) type RegistryMultiSend<T> = RegistryMulti<*const T>;
pub(crate) type RegistryMultiRecv = RegistryMulti<()>;
pub(crate) trait Registry: Send + Sync + 'static {
type Waker: Send + Unpin + 'static + Debug;
fn get_waker_state(&self, o_waker: &Option<Self::Waker>, order: Ordering) -> u8;
#[inline(always)]
fn clear_wakers(&self, _waker: &Self::Waker) {}
fn close(&self);
#[inline(always)]
fn len(&self) -> usize {
0
}
#[inline(always)]
fn commit_waiting(&self, _o_waker: &Option<Self::Waker>) -> u8 {
WakerState::Init as u8
}
#[inline(always)]
fn cancel_waker(&self, o_waker: &mut Option<Self::Waker>) {
let _ = o_waker.take();
}
#[inline(always)]
fn abandon_waker(&self, _waker: &Self::Waker) -> Result<(), u8> {
Ok(())
}
}
pub(crate) trait RegistrySend<T>: Registry {
fn new() -> Self;
#[inline(always)]
fn use_direct_copy(&self) -> bool {
false
}
#[inline(always)]
fn reg_waker_blocking(
&self, _o_waker: &mut Option<<Self as Registry>::Waker>, _cache: &WakerCache<*const T>,
_payload: *const T,
) {
unreachable!();
}
#[inline(always)]
fn reg_waker_async(
&self, _ctx: &mut Context, _o_waker: &mut Option<<Self as Registry>::Waker>,
) -> Option<Poll<()>> {
unreachable!();
}
#[inline(always)]
fn cancel_reuse_waker(
&self, o_waker: &mut Option<<Self as Registry>::Waker>, state: WakerState,
) -> u8 {
let _ = o_waker.take();
state as u8
}
#[inline(always)]
fn fire<F>(&self, _flavor: &F) -> WakeResult
where
F: FlavorImpl<Item = T>,
{
WakeResult::Next
}
#[inline(always)]
fn cache_waker(
&self, _o_waker: Option<<Self as Registry>::Waker>, _cache: &WakerCache<*const T>,
) {
}
}
pub(crate) trait RegistryRecv: Registry {
fn new() -> Self;
#[inline(always)]
fn fire(&self) {}
#[inline(always)]
fn reg_waker_blocking(
&self, _o_waker: &mut Option<<Self as Registry>::Waker>, _cache: &WakerCache<()>,
) {
unreachable!();
}
#[inline(always)]
fn reg_waker_async(
&self, _ctx: &mut Context, _o_waker: &mut Option<<Self as Registry>::Waker>,
) -> Option<Poll<()>> {
unreachable!();
}
#[inline(always)]
fn cache_waker(&self, _o_waker: Option<<Self as Registry>::Waker>, _cache: &WakerCache<()>) {}
fn reg_select_waker(&self, channel_id: usize, waker: &Arc<SelectWaker>) -> bool;
#[inline(always)]
fn cancel_select_waker(&self, _waker: &Arc<SelectWaker>) {}
}
#[derive(Debug)]
pub struct RegistryDummy();
impl Registry for RegistryDummy {
type Waker = ();
#[inline(always)]
fn get_waker_state(&self, _o_waker: &Option<Self::Waker>, _order: Ordering) -> u8 {
unreachable!();
}
#[inline(always)]
fn close(&self) {}
}
impl<T> RegistrySend<T> for RegistryDummy {
#[inline(always)]
fn new() -> Self {
Self()
}
}
type SingleWaker = ArcWaker<()>;
pub struct RegistrySingle {
cell: WeakCell<WakerInner<()>>,
_tag: &'static str,
}
impl RegistrySingle {
#[inline(always)]
fn _fire(&self) {
if let Some(waker) = self.cell.pop() {
waker.wake();
trace_log!("{} wake", self._tag);
}
}
#[inline(always)]
fn _reg_waker_async(&self, ctx: &mut Context, o_waker: &mut Option<SingleWaker>) {
let waker = ArcWaker::<()>::new_async(ctx, ());
trace_log!("{}{:?}: reg {:?}", self._tag, tokio_task_id!(), waker);
self.cell.replace(waker.weak());
o_waker.replace(waker);
}
#[inline(always)]
fn _reg_waker_blocking(&self, o_waker: &mut Option<SingleWaker>) {
let waker = ArcWaker::<()>::new_blocking(());
trace_log!("{}{:?}: reg {:?}", self._tag, tokio_task_id!(), waker);
self.cell.replace(waker.weak());
o_waker.replace(waker);
}
}
impl Registry for RegistrySingle {
type Waker = SingleWaker;
#[inline(always)]
fn get_waker_state(&self, _o_waker: &Option<SingleWaker>, _order: Ordering) -> u8 {
if self.cell.is_empty() {
WakerState::Woken as u8
} else {
WakerState::Init as u8
}
}
#[inline(always)]
fn close(&self) {
self._fire();
}
}
impl<T> RegistrySend<T> for RegistrySingle {
#[inline(always)]
fn new() -> Self {
Self { cell: WeakCell::new(), _tag: "tx" }
}
#[inline(always)]
fn fire<F>(&self, _flavor: &F) -> WakeResult
where
F: FlavorImpl<Item = T>,
{
self._fire();
WakeResult::Next
}
#[inline(always)]
fn reg_waker_blocking(
&self, o_waker: &mut Option<SingleWaker>, _cache: &WakerCache<*const T>, _payload: *const T,
) {
self._reg_waker_blocking(o_waker);
}
#[inline(always)]
fn reg_waker_async(
&self, ctx: &mut Context, o_waker: &mut Option<SingleWaker>,
) -> Option<Poll<()>> {
self._reg_waker_async(ctx, o_waker);
None
}
}
impl RegistryRecv for RegistrySingle {
#[inline(always)]
fn new() -> Self {
Self { cell: WeakCell::new(), _tag: "rx" }
}
#[inline(always)]
fn fire(&self) {
self._fire();
}
#[inline(always)]
fn reg_waker_blocking(&self, o_waker: &mut Option<SingleWaker>, _cache: &WakerCache<()>) {
self._reg_waker_blocking(o_waker)
}
#[inline(always)]
fn reg_waker_async(
&self, ctx: &mut Context, o_waker: &mut Option<SingleWaker>,
) -> Option<Poll<()>> {
self._reg_waker_async(ctx, o_waker);
None
}
#[inline(always)]
fn reg_select_waker(&self, _channel_id: usize, waker: &Arc<SelectWaker>) -> bool {
trace_log!("{}: reg for select", self._tag);
self.cell.replace(waker.clone_weak());
false
}
}
struct RegistryMultiInner<P> {
queue: VecDeque<Weak<WakerInner<P>>>,
selectors: Vec<SelectWakerWrapper>,
seq: u32,
}
impl<P> RegistryMultiInner<P> {
#[inline(always)]
fn new() -> Self {
Self { queue: VecDeque::with_capacity(32), selectors: Vec::with_capacity(32), seq: 0 }
}
#[inline(always)]
fn check_select(&self) -> u8 {
if self.selectors.is_empty() {
0
} else {
MULTI_HAS_SELECT
}
}
#[inline(always)]
fn check_waker(&self) -> u8 {
if self.queue.is_empty() {
0
} else {
MULTI_HAS_WAKER
}
}
}
const MULTI_EMPTY: u8 = 0;
const MULTI_HAS_SELECT: u8 = 1;
const MULTI_HAS_WAKER: u8 = 2;
pub struct RegistryMulti<P> {
state: AtomicU8,
inner: Mutex<RegistryMultiInner<P>>,
_tag: &'static str,
}
impl<P: Copy> RegistryMulti<P> {
#[inline(always)]
fn reg_waker(&self, waker: &ArcWaker<P>) {
let weak = waker.weak();
{
let mut guard = self.inner.lock();
let seq = guard.seq.wrapping_add(1);
guard.seq = seq;
waker.set_seq(seq);
if guard.queue.is_empty() {
self.state.store(guard.check_select() | MULTI_HAS_WAKER, Ordering::SeqCst);
}
guard.queue.push_back(weak);
}
}
#[inline(always)]
fn _reg_waker_async(
&self, ctx: &mut Context, o_waker: &mut Option<ArcWaker<P>>, payload: P,
) -> Option<Poll<()>> {
if let Some(waker) = o_waker.as_ref() {
match waker.try_change_state(WakerState::Woken, WakerState::Init) {
Ok(_) => {
if waker.will_wake(ctx) {
self.reg_waker(waker);
return None;
}
}
Err(state) => {
if state < WakerState::Woken as u8 {
if waker.will_wake(ctx) {
trace_log!(
"{} {:?}: will_wake {:?}",
self._tag,
tokio_task_id!(),
waker
);
return Some(Poll::Pending);
} else {
if waker.get_state_relaxed() < WakerState::Woken as u8 {
self._clear_wakers(waker, true);
}
trace_log!(
"{} {:?}: drop waker {:?}",
self._tag,
tokio_task_id!(),
waker
);
}
} else if state == WakerState::Closed as u8 {
return Some(Poll::Ready(()));
} else {
panic!("state: impossible for async {:?}", state);
}
}
}
}
let waker = ArcWaker::<P>::new_async(ctx, payload);
self.reg_waker(&waker);
o_waker.replace(waker);
None
}
#[inline(always)]
fn _reg_waker_blocking(
&self, o_waker: &mut Option<ArcWaker<P>>, _cache: &WakerCache<P>, payload: P,
) {
if let Some(waker) = o_waker.as_ref() {
waker.reset_init();
self.reg_waker(waker);
trace_log!("{}{:?}: re-reg {:?}", self._tag, tokio_task_id!(), waker);
} else {
debug_assert!(o_waker.is_none());
let waker = ArcWaker::<P>::new_blocking(payload);
self.reg_waker(&waker);
trace_log!("{}{:?}: reg {:?}", self._tag, tokio_task_id!(), waker);
o_waker.replace(waker);
}
}
#[inline(always)]
fn pop_first(&self) -> Option<(ArcWaker<P>, Option<u32>)> {
let flag = self.state.load(Ordering::SeqCst);
if flag == MULTI_EMPTY {
return None;
}
{
let mut guard = self.inner.lock();
if flag & MULTI_HAS_SELECT > 0 {
for select in &guard.selectors {
select.wake();
}
}
if flag & MULTI_HAS_WAKER > 0 {
let mut has_pop = false;
loop {
if let Some(weak) = guard.queue.pop_front() {
has_pop = true;
if let Some(inner) = weak.upgrade() {
if guard.queue.is_empty() {
self.state.store(guard.check_select(), Ordering::SeqCst);
return Some((ArcWaker::from_arc(inner), None));
} else {
return Some((ArcWaker::from_arc(inner), Some(guard.seq)));
}
}
} else {
if has_pop {
self.state.store(guard.check_select(), Ordering::SeqCst);
}
return None;
}
}
}
None
}
}
#[inline(always)]
fn pop_again(&self) -> Option<ArcWaker<P>> {
let flag = self.state.load(Ordering::Acquire);
if flag == MULTI_EMPTY {
return None;
}
{
let mut guard = self.inner.lock();
let mut has_pop = false;
loop {
if let Some(weak) = guard.queue.pop_front() {
has_pop = true;
if let Some(inner) = weak.upgrade() {
if guard.queue.is_empty() {
self.state.store(guard.check_select(), Ordering::SeqCst);
}
return Some(ArcWaker::from_arc(inner));
}
} else {
if has_pop {
self.state.store(guard.check_select(), Ordering::SeqCst);
}
return None;
}
}
}
}
#[inline(always)]
fn _clear_wakers(&self, old_waker: &ArcWaker<P>, oneshot: bool) {
if self.state.load(Ordering::Acquire) & MULTI_HAS_WAKER == 0 {
return;
}
let old_seq = old_waker.get_seq();
macro_rules! process {
($guard: expr, $weak: expr) => {{
if let Some(waker) = $weak.upgrade() {
let _seq = waker.get_seq();
if _seq == old_seq {
trace_log!("{}: clear {:?} hit", self._tag, waker);
true
} else if _seq > old_seq {
$guard.queue.push_front($weak);
true
} else {
let state = waker.get_state();
if state < WakerState::Woken as u8 {
$guard.queue.push_front($weak);
true
} else {
if oneshot {
trace_log!("{}: cancel {:?} one {}", self._tag, waker, old_seq);
true
} else {
trace_log!("{}: cancel {:?}<{}", self._tag, waker, old_seq);
false
}
}
}
} else {
false
}
}};
}
let mut guard = self.inner.lock();
if let Some(weak) = guard.queue.pop_front() {
if process!(guard, weak) {
if guard.queue.is_empty() {
self.state.store(guard.check_select(), Ordering::SeqCst);
}
return;
}
loop {
if let Some(_weak) = guard.queue.pop_front() {
if process!(guard, _weak) {
if guard.queue.is_empty() {
self.state.store(guard.check_select(), Ordering::SeqCst);
}
return;
}
} else {
self.state.store(guard.check_select(), Ordering::SeqCst);
return;
}
}
}
}
#[inline(always)]
fn _cache_waker(_o_waker: Option<ArcWaker<P>>, _cache: &WakerCache<P>) {
}
}
impl<P: 'static + Copy> Registry for RegistryMulti<P> {
type Waker = ArcWaker<P>;
#[inline(always)]
fn get_waker_state(&self, o_waker: &Option<ArcWaker<P>>, order: Ordering) -> u8 {
if let Some(waker) = o_waker {
waker._get_state(order)
} else {
unreachable!();
}
}
#[inline(always)]
fn clear_wakers(&self, waker: &ArcWaker<P>) {
self._clear_wakers(waker, false);
}
#[inline(always)]
fn close(&self) {
let mut guard = self.inner.lock();
for selector in &guard.selectors {
selector.wake();
}
while let Some(weak) = guard.queue.pop_front() {
if let Some(waker) = weak.upgrade() {
let _r = waker.close_wake();
trace_log!("close {} wake {:?} {}", self._tag, waker, _r);
}
}
self.state.store(0, Ordering::SeqCst);
}
#[inline]
fn len(&self) -> usize {
let guard = self.inner.lock();
guard.queue.len()
}
#[inline(always)]
fn commit_waiting(&self, o_waker: &Option<ArcWaker<P>>) -> u8 {
if let Some(waker) = &o_waker {
waker.commit_waiting()
} else {
unreachable!();
}
}
#[inline(always)]
fn abandon_waker(&self, waker: &ArcWaker<P>) -> Result<(), u8> {
match waker.abandon() {
Ok(()) => {
trace_log!("{}: abandon cancel {:?}", self._tag, waker);
self.clear_wakers(waker);
Ok(())
}
Err(state) => Err(state),
}
}
#[inline(always)]
fn cancel_waker(&self, o_waker: &mut Option<ArcWaker<P>>) {
if let Some(waker) = o_waker.take() {
if waker.get_state_relaxed() >= WakerState::Woken as u8 {
return;
}
self._clear_wakers(&waker, true);
}
}
}
impl<T: 'static> RegistrySend<T> for RegistryMultiSend<T> {
#[inline(always)]
fn new() -> Self {
Self { inner: Mutex::new(RegistryMultiInner::new()), state: AtomicU8::new(0), _tag: "tx" }
}
#[inline(always)]
fn use_direct_copy(&self) -> bool {
self.state.load(Ordering::Relaxed) != MULTI_EMPTY
}
#[inline(always)]
fn reg_waker_blocking(
&self, o_waker: &mut Option<ArcWaker<*const T>>, cache: &WakerCache<*const T>,
payload: *const T,
) {
self._reg_waker_blocking(o_waker, cache, payload)
}
#[inline(always)]
fn reg_waker_async(
&self, ctx: &mut Context, o_waker: &mut Option<ArcWaker<*const T>>,
) -> Option<Poll<()>> {
self._reg_waker_async(ctx, o_waker, std::ptr::null_mut())
}
#[inline(always)]
fn cancel_reuse_waker(
&self, o_waker: &mut Option<ArcWaker<*const T>>, state: WakerState,
) -> u8 {
if let Some(waker) = o_waker.as_ref() {
let cur_state = waker.get_state();
if cur_state >= WakerState::Woken as u8 {
trace_log!("{}: cancel_reuse {:?} {}", self._tag, waker, cur_state);
if cur_state < state as u8 {
state as u8
} else {
cur_state
}
} else {
self._clear_wakers(waker, true);
let _ = o_waker.take();
state as u8
}
} else {
unreachable!();
}
}
#[inline(always)]
fn fire<F>(&self, _flavor: &F) -> WakeResult
where
F: FlavorImpl<Item = T>,
{
if let Some((waker, _last_seq)) = self.pop_first() {
let r = waker.wake();
trace_log!("wake {} {:?} {:?}", self._tag, waker, r);
if r.is_done() {
return r;
}
drop(waker);
if let Some(mut last_seq) = _last_seq {
last_seq = last_seq.wrapping_sub(1);
while let Some(_waker) = self.pop_again() {
let r = _waker.wake();
trace_log!("wake {} {:?} {:?}", self._tag, _waker, r);
if r.is_done() {
return r;
}
if _waker.get_seq() >= last_seq {
trace_log!("wake {} stop at {}", self._tag, last_seq);
return WakeResult::Next;
}
}
}
}
WakeResult::Next
}
#[inline(always)]
fn cache_waker(&self, o_waker: Option<ArcWaker<*const T>>, cache: &WakerCache<*const T>) {
Self::_cache_waker(o_waker, cache);
}
}
impl RegistryRecv for RegistryMultiRecv {
#[inline(always)]
fn new() -> Self {
Self { inner: Mutex::new(RegistryMultiInner::new()), state: AtomicU8::new(0), _tag: "rx" }
}
#[inline(always)]
fn reg_waker_blocking(&self, o_waker: &mut Option<ArcWaker<()>>, cache: &WakerCache<()>) {
self._reg_waker_blocking(o_waker, cache, ())
}
#[inline(always)]
fn reg_waker_async(
&self, ctx: &mut Context, o_waker: &mut Option<ArcWaker<()>>,
) -> Option<Poll<()>> {
self._reg_waker_async(ctx, o_waker, ())
}
#[inline(always)]
fn fire(&self) {
if let Some((waker, _last_seq)) = self.pop_first() {
let r = waker.wake();
trace_log!("wake {} {:?} {:?}", self._tag, waker, r);
if r.is_done() {
return;
}
drop(waker);
if let Some(mut last_seq) = _last_seq {
last_seq = last_seq.wrapping_sub(1);
while let Some(_waker) = self.pop_again() {
let r = _waker.wake();
trace_log!("wake {} {:?} {:?}", self._tag, _waker, r);
if r.is_done() {
return;
}
if _waker.get_seq() >= last_seq {
trace_log!("wake {} stop at {}", self._tag, last_seq);
return;
}
}
}
}
}
#[inline(always)]
fn cache_waker(&self, o_waker: Option<ArcWaker<()>>, cache: &WakerCache<()>) {
Self::_cache_waker(o_waker, cache);
}
#[inline(always)]
fn reg_select_waker(&self, channel_id: usize, waker: &Arc<SelectWaker>) -> bool {
trace_log!("{}: reg for select", self._tag);
let mut guard = self.inner.lock();
if guard.selectors.is_empty() {
self.state.store(guard.check_waker() | MULTI_HAS_SELECT, Ordering::SeqCst);
}
guard.selectors.push(SelectWaker::to_wrapper(waker.clone(), channel_id));
true
}
#[inline(always)]
fn cancel_select_waker(&self, waker: &Arc<SelectWaker>) {
let mut guard = self.inner.lock();
if let Some((i, _)) = guard.selectors.iter().enumerate().find(|&(_, entry)| entry.eq(waker))
{
guard.selectors.remove(i);
}
if guard.selectors.is_empty() {
self.state.store(guard.check_waker(), Ordering::SeqCst);
}
}
}
pub struct SelectWakerWrapper(Arc<SelectWaker>, usize);
impl SelectWakerWrapper {
#[inline(always)]
pub(crate) fn wake(&self) {
if let Some(waker) = self.0.cell.pop() {
trace_log!("rx: wake select");
self.0.hint.store(self.1, Ordering::Release);
waker.wake();
}
}
#[inline(always)]
pub(crate) fn eq(&self, waker: &Arc<SelectWaker>) -> bool {
Arc::ptr_eq(&self.0, waker)
}
}
impl Registry for SelectWakerWrapper {
type Waker = ArcWaker<()>;
#[inline(always)]
fn get_waker_state(&self, _o_waker: &Option<ArcWaker<()>>, _order: Ordering) -> u8 {
unreachable!();
}
#[inline(always)]
fn close(&self) {
self.0.close();
self.wake();
}
}
impl RegistryRecv for SelectWakerWrapper {
fn new() -> Self {
unreachable!();
}
#[inline(always)]
fn fire(&self) {
self.wake();
}
fn reg_select_waker(&self, _channel_id: usize, _waker: &Arc<SelectWaker>) -> bool {
unreachable!();
}
}
pub(crate) struct SelectWaker {
cell: WeakCell<WakerInner<()>>,
hint: AtomicUsize,
o_waker: UnsafeCell<Option<ArcWaker<()>>>,
opened_channels: AtomicUsize,
}
unsafe impl Send for SelectWaker {}
unsafe impl Sync for SelectWaker {}
impl SelectWaker {
#[inline(always)]
pub fn new() -> Self {
Self {
cell: WeakCell::new(),
hint: AtomicUsize::new(0),
o_waker: UnsafeCell::new(None),
opened_channels: AtomicUsize::new(0),
}
}
#[inline(always)]
pub fn init_blocking(&self) {
let weak = if let Some(waker) = self.get_waker().as_ref() {
waker.reset_init();
waker.weak()
} else {
let waker = ArcWaker::new_blocking(());
let weak = waker.weak();
self.get_waker().replace(waker);
weak
};
self.cell.replace(weak);
self.hint.store(0, Ordering::Release)
}
#[allow(dead_code)]
#[inline(always)]
pub fn init_async(&self, ctx: &mut Context) {
let waker = ArcWaker::new_async(ctx, ());
let weak = waker.weak();
self.get_waker().replace(waker);
self.cell.replace(weak);
self.hint.store(0, Ordering::Release)
}
#[inline(always)]
fn get_waker(&self) -> &mut Option<ArcWaker<()>> {
unsafe { &mut *self.o_waker.get() }
}
#[inline(always)]
fn clone_weak(&self) -> Weak<WakerInner<()>> {
self.get_waker().as_ref().unwrap().weak()
}
#[inline(always)]
pub fn add_opened(&self) {
self.opened_channels.fetch_add(1, Ordering::SeqCst);
}
#[inline(always)]
pub fn get_opened_count(&self) -> usize {
self.opened_channels.load(Ordering::SeqCst)
}
#[inline(always)]
pub fn to_wrapper(self: Arc<SelectWaker>, idx: usize) -> SelectWakerWrapper {
SelectWakerWrapper(self, idx)
}
#[inline(always)]
pub fn get_hint(&self) -> usize {
compiler_fence(Ordering::AcqRel);
self.hint.load(Ordering::Relaxed)
}
#[inline(always)]
pub fn close(&self) {
self.opened_channels.fetch_sub(1, Ordering::SeqCst);
}
#[inline(always)]
pub fn get_waker_state(&self, order: Ordering) -> u8 {
self.get_waker().as_ref().unwrap()._get_state(order)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::waker::ArcWaker;
#[test]
fn print_waker_registry_size() {
use std::mem::size_of;
println!("RegistryMultiSend<usize> size {}", size_of::<RegistryMultiSend<usize>>());
println!("RegistryMultiRecv size {}", size_of::<RegistryMultiRecv>());
println!("RegistrySingle size {}", size_of::<RegistrySingle>());
println!("RegistryMulti<()> size {}", size_of::<RegistryMultiRecv>());
}
#[test]
fn test_registry_multi_pop() {
let reg = RegistryMultiRecv::new();
let waker1 = ArcWaker::new_blocking(());
assert_eq!(reg.len(), 0);
reg.reg_waker(&waker1);
assert_eq!(waker1.get_state(), WakerState::Init as u8);
assert_eq!(waker1.get_seq(), 1);
assert_eq!(reg.len(), 1);
let waker2 = ArcWaker::new_blocking(());
reg.reg_waker(&waker2);
waker2.commit_waiting();
assert_eq!(waker2.get_seq(), 2);
assert_eq!(reg.len(), 2);
assert_eq!(waker2.get_seq(), waker1.get_seq() + 1);
assert_eq!(waker2.get_state(), WakerState::Waiting as u8);
if let Some((w, seq)) = reg.pop_first() {
assert!(w.wake() == WakeResult::Next);
assert!(seq.is_some());
}
assert_eq!(waker1.get_state(), WakerState::Woken as u8);
assert_eq!(reg.len(), 1);
if let Some(w) = reg.pop_again() {
assert!(w.wake() == WakeResult::Woken);
}
assert_eq!(waker2.get_state(), WakerState::Woken as u8);
assert_eq!(reg.len(), 0);
}
#[test]
fn test_registry_multi_clear_waiting() {
let reg = RegistryMultiRecv::new();
let waker3 = ArcWaker::new_blocking(());
reg.reg_waker(&waker3);
waker3.commit_waiting();
assert_eq!(waker3.get_state(), WakerState::Waiting as u8);
let waker4 = ArcWaker::new_blocking(());
reg.reg_waker(&waker4); assert_eq!(waker4.get_state(), WakerState::Init as u8);
let num_workers = reg.len();
reg.clear_wakers(&waker4);
assert_eq!(reg.len(), num_workers);
for _ in 0..10 {
let _waker = ArcWaker::new_blocking(());
reg.reg_waker(&_waker);
}
let num_workers = reg.len();
assert_eq!(reg.len(), num_workers);
}
#[test]
fn test_registry_multi_clear_oneshot() {
let reg = RegistryMultiRecv::new();
let waker1 = ArcWaker::new_blocking(());
reg.reg_waker(&waker1);
assert_eq!(waker1.get_state(), WakerState::Init as u8);
let waker2 = ArcWaker::new_blocking(());
reg.reg_waker(&waker2); waker2.commit_waiting();
assert_eq!(waker2.get_state(), WakerState::Waiting as u8);
for _ in 0..10 {
let _waker = ArcWaker::new_blocking(());
reg.reg_waker(&_waker);
}
let num_workers = reg.len();
println!("clear waker2 oneshot seq {}", waker2.get_seq());
reg.cancel_waker(&mut Some(waker2));
assert_eq!(reg.len(), num_workers); reg.cancel_waker(&mut Some(waker1));
assert_eq!(reg.len(), num_workers - 1); }
#[test]
fn test_registry_multi_clear() {
let reg = RegistryMultiRecv::new();
let waker1 = ArcWaker::new_blocking(());
reg.reg_waker(&waker1);
assert_eq!(waker1.get_state(), WakerState::Init as u8);
let waker2 = ArcWaker::new_blocking(());
reg.reg_waker(&waker2); drop(waker2); for _ in 0..10 {
let _waker = ArcWaker::new_blocking(());
reg.reg_waker(&_waker);
}
let waker3 = ArcWaker::new_blocking(());
reg.reg_waker(&waker3);
let _num_workers = reg.len(); println!("clear waker3 seq={}", waker3.get_seq());
reg.clear_wakers(&waker3); assert_eq!(reg.len(), 13);
reg.clear_wakers(&waker1);
assert_eq!(reg.len(), 12);
reg.clear_wakers(&waker3);
assert_eq!(reg.len(), 0);
}
#[test]
fn test_registry_multi_close() {
let reg = RegistryMultiRecv::new();
println!("test close");
for _ in 0..10 {
let _waker = ArcWaker::new_blocking(());
reg.reg_waker(&_waker);
}
assert!(reg.len() > 0);
reg.close();
assert_eq!(reg.len(), 0);
}
}