use crate::dispatcher::*;
use derive_more::{Deref, DerefMut, From};
use futures::sink::{Sink, SinkExt};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
#[pin_project]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Hash, From, Deref, DerefMut)]
pub struct AsyncDispatcher<T>(#[pin] pub T);
impl<A, T> Dispatcher<A> for AsyncDispatcher<T>
where
T: Sink<A> + Unpin,
{
type Output = Result<(), T::Error>;
fn dispatch(&mut self, action: A) -> Self::Output {
futures::executor::block_on(self.send(action))
}
}
impl<A, T> Sink<A> for AsyncDispatcher<T>
where
T: Sink<A>,
{
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, action: A) -> Result<(), Self::Error> {
self.project().0.start_send(action)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_close(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use mockall::predicate::*;
use std::{ops::*, vec::Vec};
use test_strategy::proptest;
use tokio::runtime;
#[proptest]
fn deref(sink: Vec<u8>) {
let mut dispatcher = AsyncDispatcher(sink.clone());
assert_eq!(dispatcher.deref(), &sink);
assert_eq!(dispatcher.deref_mut(), &sink);
}
#[proptest]
fn dispatch(action: u8, result: Result<(), u8>) {
let mut mock = MockDispatcher::new();
mock.expect_dispatch()
.with(eq(action))
.once()
.return_const(result);
let mut dispatcher = AsyncDispatcher(mock);
assert_eq!(Dispatcher::dispatch(&mut dispatcher, action), result);
}
#[proptest]
fn sink(action: u8, result: Result<(), u8>) {
let rt = runtime::Builder::new_multi_thread().build()?;
let mut mock = MockDispatcher::new();
mock.expect_dispatch()
.with(eq(action))
.once()
.return_const(result);
let mut dispatcher = AsyncDispatcher(mock);
assert_eq!(rt.block_on(dispatcher.send(action)), result);
assert_eq!(rt.block_on(dispatcher.close()), Ok(()));
}
}