use crate::dispatcher::Dispatcher;
use crate::reactor::Reactor;
use crate::reducer::Reducer;
use core::mem::replace;
use derive_more::Deref;
#[cfg(feature = "async")]
use pin_project::pin_project;
#[cfg_attr(feature = "async", pin_project(project = PinnedStore))]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Hash, Deref)]
pub struct Store<S, R> {
#[deref]
state: S,
#[cfg_attr(feature = "async", pin)]
reactor: R,
}
impl<S, R> Store<S, R> {
pub fn new(state: S, reactor: R) -> Self {
Self { state, reactor }
}
pub fn subscribe(&mut self, reactor: impl Into<R>) -> R {
replace(&mut self.reactor, reactor.into())
}
}
impl<A, S, R> Dispatcher<A> for Store<S, R>
where
S: Reducer<A>,
R: Reactor<S>,
{
type Output = Result<(), R::Error>;
fn dispatch(&mut self, action: A) -> Self::Output {
self.state.reduce(action);
self.reactor.react(&self.state)
}
}
#[cfg(feature = "async")]
mod sink {
use super::*;
use crate::dispatcher::AsyncDispatcher;
use derive_more::{Display, Error};
use futures::channel::mpsc::channel;
use futures::prelude::*;
use futures::sink::Sink;
use std::pin::Pin;
use std::task::{Context, Poll};
impl<A, S, R, E> Sink<A> for Store<S, R>
where
S: Reducer<A>,
R: for<'s> Sink<&'s S, Error = E>,
{
type Error = E;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().reactor.poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, action: A) -> Result<(), Self::Error> {
let PinnedStore { state, reactor } = self.project();
state.reduce(action);
reactor.start_send(state)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().reactor.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().reactor.poll_close(cx)
}
}
#[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Hash, Error)]
pub enum DispatchError {
#[display(fmt = "The spawned task has terminated and cannot receive further actions")]
Terminated,
}
impl<S, R> Store<S, R> {
pub fn into_task<A, E>(
self,
) -> (
impl Future<Output = Result<(), E>>,
impl Dispatcher<A, Output = Result<(), DispatchError>>
+ Sink<A, Error = DispatchError>
+ Clone,
)
where
Self: Sink<A, Error = E>,
{
let (tx, rx) = channel(0);
let future = rx.map(Ok).forward(self);
let dispatcher = AsyncDispatcher(tx.sink_map_err(|_| DispatchError::Terminated));
(future, dispatcher)
}
}
}
#[cfg(feature = "async")]
pub use sink::*;
#[cfg(test)]
mod tests {
use super::*;
use crate::reactor::MockReactor;
use crate::reducer::MockReducer;
use mockall::predicate::*;
use std::ops::Deref;
use test_strategy::proptest;
#[cfg(feature = "async")]
use crate::reactor::AsyncReactor;
#[cfg(feature = "async")]
use futures::SinkExt;
#[cfg(feature = "async")]
use tokio::runtime;
#[cfg(feature = "async")]
use std::thread::yield_now;
#[proptest]
fn default() {
Store::<(), ()>::default();
}
#[proptest]
fn deref(state: u8) {
let store = Store::new(state, ());
assert_eq!(store.deref(), &store.state);
}
#[proptest]
fn new(state: u8, reactor: u8) {
let store = Store::new(state, reactor);
assert_eq!(store.state, state);
assert_eq!(store.reactor, reactor);
}
#[proptest]
fn clone(a: usize, b: usize) {
let mut reducer = MockReducer::<()>::new();
reducer.expect_id().return_const(a);
reducer.expect_clone().once().returning(move || {
let mut mock = MockReducer::new();
mock.expect_id().return_const(a);
mock
});
let mut reactor = MockReactor::<(), ()>::new();
reactor.expect_id().return_const(b);
reactor.expect_clone().once().returning(move || {
let mut mock = MockReactor::new();
mock.expect_id().return_const(b);
mock
});
#[allow(clippy::redundant_clone)]
let store = Store::new(reducer, reactor).clone();
assert_eq!(store.state.id(), a);
assert_eq!(store.reactor.id(), b);
}
#[proptest]
fn subscribe(a: usize, b: usize) {
let mut mock = MockReactor::<(), ()>::new();
mock.expect_id().return_const(a);
let mut store = Store::new((), mock);
let mut mock = MockReactor::<_, ()>::new();
mock.expect_id().return_const(b);
assert_eq!(store.subscribe(mock).id(), a);
assert_eq!(store.reactor.id(), b);
}
#[proptest]
fn dispatch(action: u8, result: Result<(), u8>, id: usize) {
let mut reducer = MockReducer::new();
reducer.expect_id().return_const(id);
reducer.expect_clone().never();
reducer
.expect_reduce()
.with(eq(action))
.once()
.return_const(());
let mut reactor = MockReactor::new();
reactor
.expect_react()
.with(function(move |x: &MockReducer<_>| x.id() == id))
.once()
.return_const(result);
let mut store = Store::new(reducer, reactor);
assert_eq!(Dispatcher::dispatch(&mut store, action), result);
}
#[cfg(feature = "async")]
#[proptest]
fn sink(action: u8, result: Result<(), u8>, id: usize) {
let rt = runtime::Builder::new_multi_thread().build()?;
let mut reducer = MockReducer::new();
reducer.expect_id().return_const(id);
reducer.expect_clone().returning(move || {
let mut mock = MockReducer::new();
mock.expect_id().return_const(id);
mock.expect_reduce().never();
mock.expect_clone().never();
mock
});
reducer
.expect_reduce()
.with(eq(action))
.once()
.return_const(());
let mut reactor = MockReactor::new();
reactor
.expect_react()
.with(function(move |x: &MockReducer<_>| x.id() == id))
.once()
.return_const(result);
let mut store = Store::new(reducer, AsyncReactor(reactor));
assert_eq!(rt.block_on(store.send(action)), result);
assert_eq!(rt.block_on(store.close()), Ok(()));
}
#[cfg(feature = "async")]
#[proptest]
fn task(action: u8, result: Result<(), u8>, id: usize) {
let rt = runtime::Builder::new_multi_thread().build()?;
let mut reducer = MockReducer::new();
reducer.expect_id().return_const(id);
reducer.expect_clone().returning(move || {
let mut mock = MockReducer::new();
mock.expect_id().return_const(id);
mock.expect_reduce().never();
mock.expect_clone().never();
mock
});
reducer
.expect_reduce()
.with(eq(action))
.once()
.return_const(());
let mut reactor = MockReactor::new();
reactor
.expect_react()
.with(function(move |x: &MockReducer<_>| x.id() == id))
.once()
.return_const(result);
let store = Store::new(reducer, AsyncReactor(reactor));
let (task, mut dispatcher) = store.into_task();
let handle = rt.spawn(task);
assert_eq!(dispatcher.dispatch(action), Ok(()));
assert_eq!(rt.block_on(dispatcher.close()), Ok(()));
assert_eq!(rt.block_on(handle)?, result);
}
#[cfg(feature = "async")]
#[proptest]
fn error(action: u8, error: u8, id: usize) {
let rt = runtime::Builder::new_multi_thread().build()?;
let mut reducer = MockReducer::new();
reducer.expect_id().return_const(id);
reducer.expect_clone().returning(move || {
let mut mock = MockReducer::new();
mock.expect_id().return_const(id);
mock.expect_reduce().never();
mock.expect_clone().never();
mock
});
reducer
.expect_reduce()
.with(eq(action))
.once()
.return_const(());
let mut reactor = MockReactor::new();
reactor
.expect_react()
.with(function(move |x: &MockReducer<_>| x.id() == id))
.once()
.return_const(Err(error));
let store = Store::new(reducer, AsyncReactor(reactor));
let (task, mut dispatcher) = store.into_task();
let handle = rt.spawn(task);
assert_eq!(dispatcher.dispatch(action), Ok(()));
loop {
match dispatcher.dispatch(action) {
Ok(()) => yield_now(),
Err(e) => break assert_eq!(e, DispatchError::Terminated),
}
}
assert_eq!(rt.block_on(handle)?, Err(error));
}
}