use crate::traits::GuardRecovery;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
pub struct SignalSender<T> {
inner: Arc<Mutex<SignalChannel<T>>>,
}
pub struct SignalReceiver<T> {
inner: Arc<Mutex<SignalChannel<T>>>,
id: usize, }
struct SignalChannel<T> {
queue: VecDeque<T>,
waker: Option<Waker>,
active_receiver_id: usize, }
impl<T> SignalSender<T> {
pub fn send(&self, msg: T) {
let mut guard = self.inner.lock().recover();
guard.queue.push_back(msg);
if let Some(waker) = guard.waker.take() {
waker.wake();
}
}
}
impl<T> SignalReceiver<T> {
pub fn recv(&self) -> impl Future<Output = Option<T>> {
RecvFuture {
inner: self.inner.clone(),
receiver_id: self.id, }
}
}
impl<T> Clone for SignalReceiver<T> {
fn clone(&self) -> Self {
let mut guard = self.inner.lock().recover();
let new_receiver = SignalReceiver {
inner: self.inner.clone(),
id: guard.active_receiver_id + 1, };
guard.active_receiver_id = new_receiver.id;
if let Some(waker) = guard.waker.take() {
waker.wake();
}
new_receiver
}
}
struct RecvFuture<T> {
inner: Arc<Mutex<SignalChannel<T>>>,
receiver_id: usize, }
impl<T> Future for RecvFuture<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = self.inner.lock().recover();
if guard.active_receiver_id == self.receiver_id {
match guard.queue.pop_front() {
Some(msg) => {
if !guard.queue.is_empty() {
cx.waker().wake_by_ref();
}
Poll::Ready(Some(msg))
}
None => {
guard.waker = Some(cx.waker().to_owned());
Poll::Pending
}
}
} else {
Poll::Ready(None)
}
}
}
#[doc(hidden)]
pub fn signal_channel<T>() -> (SignalSender<T>, SignalReceiver<T>) {
let start_receiver_id = 0;
let channel = Arc::new(Mutex::new(SignalChannel {
queue: VecDeque::new(),
waker: None,
active_receiver_id: start_receiver_id,
}));
let sender = SignalSender {
inner: channel.clone(),
};
let receiver = SignalReceiver {
inner: channel,
id: start_receiver_id,
};
(sender, receiver)
}