use std::sync::mpsc;
use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
use super::ping::{make_ping, Ping, PingError, PingSource};
#[derive(Debug)]
pub enum Event<T> {
Msg(T),
Closed,
}
#[derive(Debug)]
pub struct Sender<T> {
sender: mpsc::Sender<T>,
ping: Ping,
}
impl<T> Clone for Sender<T> {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn clone(&self) -> Sender<T> {
Sender {
sender: self.sender.clone(),
ping: self.ping.clone(),
}
}
}
impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
self.sender.send(t).map(|()| self.ping.ping())
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.ping.ping();
}
}
#[derive(Debug)]
pub struct SyncSender<T> {
sender: mpsc::SyncSender<T>,
ping: Ping,
}
impl<T> Clone for SyncSender<T> {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn clone(&self) -> SyncSender<T> {
SyncSender {
sender: self.sender.clone(),
ping: self.ping.clone(),
}
}
}
impl<T> SyncSender<T> {
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
let ret = self.try_send(t);
match ret {
Ok(()) => Ok(()),
Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()),
Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)),
}
}
pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
let ret = self.sender.try_send(t);
if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret {
self.ping.ping();
}
ret
}
}
#[derive(Debug)]
pub struct Channel<T> {
receiver: mpsc::Receiver<T>,
source: PingSource,
}
unsafe impl<T: Send> Send for Channel<T> {}
impl<T> Channel<T> {
pub fn recv(&self) -> Result<T, mpsc::RecvError> {
self.receiver.recv()
}
pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
self.receiver.try_recv()
}
}
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
let (sender, receiver) = mpsc::channel();
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(Sender { sender, ping }, Channel { receiver, source })
}
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
let (sender, receiver) = mpsc::sync_channel(bound);
let (ping, source) = make_ping().expect("Failed to create a Ping.");
(SyncSender { sender, ping }, Channel { receiver, source })
}
impl<T> EventSource for Channel<T> {
type Event = Event<T>;
type Metadata = ();
type Ret = ();
type Error = ChannelError;
fn process_events<C>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: C,
) -> Result<PostAction, Self::Error>
where
C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
{
let receiver = &self.receiver;
self.source
.process_events(readiness, token, |(), &mut ()| loop {
match receiver.try_recv() {
Ok(val) => callback(Event::Msg(val), &mut ()),
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
callback(Event::Closed, &mut ());
break;
}
}
})
.map_err(ChannelError)
}
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.source.register(poll, token_factory)
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.source.reregister(poll, token_factory)
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.source.unregister(poll)
}
}
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
pub struct ChannelError(PingError);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_channel() {
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let (tx, rx) = channel::<()>();
let mut got = (false, false);
let _channel_token = handle
.insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt {
Event::Msg(()) => {
got.0 = true;
}
Event::Closed => {
got.1 = true;
}
})
.unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut got)
.unwrap();
assert_eq!(got, (false, false));
tx.send(()).unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut got)
.unwrap();
assert_eq!(got, (true, false));
::std::mem::drop(tx);
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut got)
.unwrap();
assert_eq!(got, (true, true));
}
#[test]
fn basic_sync_channel() {
let mut event_loop = crate::EventLoop::try_new().unwrap();
let handle = event_loop.handle();
let (tx, rx) = sync_channel::<()>(2);
let mut received = (0, false);
let _channel_token = handle
.insert_source(
rx,
move |evt, &mut (), received: &mut (u32, bool)| match evt {
Event::Msg(()) => {
received.0 += 1;
}
Event::Closed => {
received.1 = true;
}
},
)
.unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut received)
.unwrap();
assert_eq!(received.0, 0);
assert!(!received.1);
tx.send(()).unwrap();
tx.send(()).unwrap();
assert!(tx.try_send(()).is_err());
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut received)
.unwrap();
assert_eq!(received.0, 2);
assert!(!received.1);
tx.send(()).unwrap();
std::mem::drop(tx);
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut received)
.unwrap();
assert_eq!(received.0, 3);
assert!(received.1);
}
}