use std::{
collections::VecDeque,
ptr::NonNull,
sync::atomic::{AtomicUsize, Ordering},
};
use futures::task::AtomicWaker;
type Mutex<T> = crate::mutex::Mutex<T>;
type MutexGuard<'a, T> = crate::mutex::MutexGuard<'a, T>;
pub struct Inner {
signal_queues: Mutex<SignalQueues>,
tx_cnt: AtomicUsize,
rx_cnt: AtomicUsize,
}
impl Default for Inner {
fn default() -> Self {
Self {
signal_queues: Mutex::new(SignalQueues::default()),
tx_cnt: AtomicUsize::new(1),
rx_cnt: AtomicUsize::new(1),
}
}
}
impl Inner {
#[inline(always)]
pub fn signal_queues(&self) -> MutexGuard<'_, SignalQueues> {
self.signal_queues.lock()
}
#[inline(always)]
pub fn inc_tx(&self) -> usize {
self.tx_cnt.fetch_add(1, Ordering::Release)
}
#[inline(always)]
pub fn inc_rx(&self) -> usize {
self.rx_cnt.fetch_add(1, Ordering::Release)
}
#[inline(always)]
pub fn dec_tx(&self) -> usize {
self.tx_cnt.fetch_sub(1, Ordering::AcqRel)
}
#[inline(always)]
pub fn dec_rx(&self) -> usize {
self.rx_cnt.fetch_sub(1, Ordering::AcqRel)
}
}
#[derive(Default)]
pub struct SignalQueues {
send_q: VecDeque<usize>,
recv_q: VecDeque<usize>,
}
impl SignalQueues {
#[inline(always)]
pub fn remove_send(&mut self, signal: usize) -> bool {
for i in 0..self.send_q.len() {
if self.send_q[i] == signal {
self.send_q.remove(i);
return true;
}
}
false
}
#[inline(always)]
pub fn remove_recv(&mut self, signal: usize) -> bool {
for i in 0..self.recv_q.len() {
if self.recv_q[i] == signal {
self.recv_q.remove(i);
return true;
}
}
false
}
#[inline(always)]
pub fn add_send(&mut self, signal: usize) {
self.send_q.push_back(signal);
}
#[inline(always)]
pub fn add_recv(&mut self, signal: usize) {
self.recv_q.push_back(signal);
}
#[inline(always)]
pub fn pop_send(&mut self) -> Option<WakerPtr> {
Self::pop_waker(&mut self.send_q)
}
#[inline(always)]
pub fn pop_recv(&mut self) -> Option<WakerPtr> {
Self::pop_waker(&mut self.recv_q)
}
#[inline(always)]
pub fn pop_waker(q: &mut VecDeque<usize>) -> Option<WakerPtr> {
q.pop_front().map(|signal| {
let sig_ptr: NonNull<AtomicWaker> = NonNull::new(signal as *mut AtomicWaker).unwrap();
WakerPtr::new(sig_ptr)
})
}
}
pub struct WakerPtr {
ptr: NonNull<AtomicWaker>,
}
impl WakerPtr {
#[inline(always)]
pub fn new(ptr: NonNull<AtomicWaker>) -> Self {
Self { ptr }
}
#[inline(always)]
pub fn as_ref(&self) -> &AtomicWaker {
unsafe { self.ptr.as_ref() }
}
}
impl std::fmt::Debug for WakerPtr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WakerPtr({:?})", self.ptr.as_ptr() as usize)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::task::AtomicWaker;
use std::sync::Arc;
fn create_mock_waker() -> Arc<AtomicWaker> {
Arc::new(AtomicWaker::new())
}
fn waker_to_usize(waker: &AtomicWaker) -> usize {
waker as *const AtomicWaker as usize
}
#[test]
fn test_inner_default() {
let inner = Inner::default();
let signal_queues = inner.signal_queues();
assert!(signal_queues.send_q.is_empty());
assert!(signal_queues.recv_q.is_empty());
}
#[test]
fn test_signal_queues_basic_operations() {
let mut queues = SignalQueues::default();
let waker1 = create_mock_waker();
let waker2 = create_mock_waker();
let addr1 = waker_to_usize(&waker1);
let addr2 = waker_to_usize(&waker2);
queues.add_send(addr1);
queues.add_recv(addr2);
assert_eq!(queues.send_q.len(), 1);
assert_eq!(queues.recv_q.len(), 1);
assert!(queues.remove_send(addr1));
assert!(!queues.remove_send(addr1)); assert!(!queues.remove_send(addr2));
assert_eq!(queues.send_q.len(), 0);
assert_eq!(queues.recv_q.len(), 1);
let popped = queues.pop_recv();
assert!(popped.is_some());
assert!(queues.recv_q.is_empty());
assert!(queues.pop_send().is_none());
}
#[test]
fn test_signal_queues_fifo_order() {
let mut queues = SignalQueues::default();
let waker1 = create_mock_waker();
let waker2 = create_mock_waker();
let waker3 = create_mock_waker();
let addr1 = waker_to_usize(&waker1);
let addr2 = waker_to_usize(&waker2);
let addr3 = waker_to_usize(&waker3);
queues.add_send(addr1);
queues.add_send(addr2);
queues.add_send(addr3);
let popped1 = queues.pop_send();
let popped2 = queues.pop_send();
let popped3 = queues.pop_send();
assert!(popped1.is_some());
assert!(popped2.is_some());
assert!(popped3.is_some());
assert!(queues.send_q.is_empty());
}
#[test]
fn test_waker_ptr() {
let waker = create_mock_waker();
let addr = waker_to_usize(&waker);
let sig_ptr: NonNull<AtomicWaker> = NonNull::new(addr as *mut AtomicWaker).unwrap();
let waker_ptr = WakerPtr::new(sig_ptr);
let waker_ref = waker_ptr.as_ref();
assert_eq!(
waker_ref as *const AtomicWaker,
&*waker as *const AtomicWaker
);
let debug_str = format!("{:?}", waker_ptr);
assert!(debug_str.contains("WakerPtr"));
}
#[test]
fn test_signal_queues_remove_duplicates() {
let mut queues = SignalQueues::default();
let waker = create_mock_waker();
let addr = waker_to_usize(&waker);
queues.add_send(addr);
queues.add_send(addr);
assert!(queues.remove_send(addr));
assert_eq!(queues.send_q.len(), 1);
assert!(queues.remove_send(addr));
assert!(queues.send_q.is_empty());
}
#[test]
fn test_pop_returns_valid_waker() {
let mut queues = SignalQueues::default();
let waker = create_mock_waker();
let addr = waker_to_usize(&waker);
queues.add_send(addr);
let popped_waker_ptr = queues.pop_send();
assert!(popped_waker_ptr.is_some());
let waker_ptr = popped_waker_ptr.unwrap();
let waker_ref = waker_ptr.as_ref();
assert_eq!(
waker_ref as *const AtomicWaker,
&*waker as *const AtomicWaker
);
let task = futures::task::Waker::noop();
waker_ref.register(&task);
waker_ref.wake();
let original_waker_ref = &*waker;
original_waker_ref.register(&task);
waker_ref.register(&task);
original_waker_ref.wake();
waker_ref.wake();
assert_eq!(
waker_ref as *const AtomicWaker,
original_waker_ref as *const AtomicWaker
);
}
}