reducer 3.0.1

A predictable reactive framework for Rust apps
Documentation
use crate::dispatcher::Dispatcher;
use crate::reactor::Reactor;
use crate::reducer::Reducer;
use core::mem::replace;
use derive_more::Deref;

#[cfg(feature = "async")]
use pin_project::pin_project;

/// A reactive state container.
///
/// The only way to mutate the internal state managed by [`Store`] is by
/// [dispatching] actions on it.
/// The associated [`Reactor`] is notified upon every state transition.
///
/// [dispatching]: Store::dispatch
///
/// # Example
///
/// ```rust
/// use reducer::*;
/// use std::error::Error;
/// use std::io::{self, Write};
///
/// // The state of your app.
/// struct Calculator(i32);
///
/// // Actions the user can trigger.
/// struct Add(i32);
/// struct Sub(i32);
/// struct Mul(i32);
/// struct Div(i32);
///
/// impl Reducer<Add> for Calculator {
///     fn reduce(&mut self, Add(x): Add) {
///         self.0 += x;
///     }
/// }
///
/// impl Reducer<Sub> for Calculator {
///     fn reduce(&mut self, Sub(x): Sub) {
///         self.0 -= x;
///     }
/// }
///
/// impl Reducer<Mul> for Calculator {
///     fn reduce(&mut self, Mul(x): Mul) {
///         self.0 *= x;
///     }
/// }
///
/// impl Reducer<Div> for Calculator {
///     fn reduce(&mut self, Div(x): Div) {
///         self.0 /= x;
///     }
/// }
///
/// // The user interface.
/// struct Console;
///
/// impl Reactor<Calculator> for Console {
///     type Error = io::Error;
///     fn react(&mut self, state: &Calculator) -> io::Result<()> {
///         io::stdout().write_fmt(format_args!("{}\n", state.0))
///     }
/// }
///
/// fn main() -> Result<(), Box<dyn Error>> {
///     let mut store = Store::new(Calculator(0), Console);
///
///     store.dispatch(Add(5))?; // displays "5"
///     store.dispatch(Mul(3))?; // displays "15"
///     store.dispatch(Sub(1))?; // displays "14"
///     store.dispatch(Div(7))?; // displays "2"
///
///     Ok(())
/// }
/// ```
#[cfg_attr(feature = "async", pin_project(project = PinnedStore))]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Hash, Deref)]
pub struct Store<S, R> {
    #[deref]
    state: S,
    #[cfg_attr(feature = "async", pin)]
    reactor: R,
}

impl<S, R> Store<S, R> {
    /// Constructs the Store given the initial state and a [`Reactor`].
    pub fn new(state: S, reactor: R) -> Self {
        Self { state, reactor }
    }

    /// Replaces the [`Reactor`] and returns the previous one.
    pub fn subscribe(&mut self, reactor: impl Into<R>) -> R {
        replace(&mut self.reactor, reactor.into())
    }
}

impl<A, S, R> Dispatcher<A> for Store<S, R>
where
    S: Reducer<A>,
    R: Reactor<S>,
{
    type Output = Result<(), R::Error>;

    /// Updates the state via [`Reducer::reduce`] and notifies the [`Reactor`],
    /// returning the result of calling [`Reactor::react`] with a reference
    /// to the new state.
    fn dispatch(&mut self, action: A) -> Self::Output {
        self.state.reduce(action);
        self.reactor.react(&self.state)
    }
}

#[cfg(feature = "async")]
mod sink {
    use super::*;
    use crate::dispatcher::AsyncDispatcher;
    use derive_more::{Display, Error};
    use futures::channel::mpsc::channel;
    use futures::prelude::*;
    use futures::sink::Sink;
    use std::pin::Pin;
    use std::task::{Context, Poll};

    /// View Store as a Sink of actions (requires [`async`]).
    ///
    /// [`async`]: index.html#optional-features
    impl<A, S, R, E> Sink<A> for Store<S, R>
    where
        S: Reducer<A>,
        R: for<'s> Sink<&'s S, Error = E>,
    {
        type Error = E;

        fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
            self.project().reactor.poll_ready(cx)
        }

        fn start_send(self: Pin<&mut Self>, action: A) -> Result<(), Self::Error> {
            let PinnedStore { state, reactor } = self.project();
            state.reduce(action);
            reactor.start_send(state)
        }

        fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
            self.project().reactor.poll_flush(cx)
        }

        fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
            self.project().reactor.poll_close(cx)
        }
    }

    /// The error returned when dispatching an action to a [spawned] [`Store`] fails
    /// (requires [`async`]).
    ///
    /// [spawned]: Store::into_task
    /// [`async`]: index.html#optional-features
    #[derive(Debug, Display, Copy, Clone, Eq, PartialEq, Hash, Error)]
    pub enum DispatchError {
        /// The [spawned] task has terminated and cannot receive further actions.
        ///
        /// [spawned]: Store::into_task
        #[display(fmt = "The spawned task has terminated and cannot receive further actions")]
        Terminated,
    }

    impl<S, R> Store<S, R> {
        /// Turns the [`Store`] into a task that can be spawned onto an executor
        /// (requires [`async`]).
        ///
        /// Once spawned, the task will receive actions dispatched through a lightweight
        /// [`Dispatcher`] handle that can be cloned and sent to other threads.
        ///
        /// The task completes
        /// * successfully if the asynchronous [`Dispatcher`] (or the last of its clones)
        ///   is dropped or [closed].
        /// * with an error if [`Store::dispatch`] fails.
        ///
        /// Turning a [`Store`] into an asynchronous task requires all actions to be of the same
        /// type `A`; an effective way of fulfilling this requirement is to define actions as
        /// `enum` variants.
        ///
        /// [`async`]: index.html#optional-features
        /// [closed]: futures::sink::SinkExt::close
        ///
        /// # Example
        ///
        /// ```rust
        /// use reducer::*;
        /// use futures::prelude::*;
        /// use std::error::Error;
        /// use std::io::{self, Write};
        /// use tokio::task::spawn;
        ///
        /// // The state of your app.
        /// #[derive(Clone)]
        /// struct Calculator(i32);
        ///
        /// // Actions the user can trigger.
        /// enum Action {
        ///     Add(i32),
        ///     Sub(i32),
        ///     Mul(i32),
        ///     Div(i32),
        /// }
        ///
        /// impl Reducer<Action> for Calculator {
        ///     fn reduce(&mut self, action: Action) {
        ///         match action {
        ///             Action::Add(x) => self.0 += x,
        ///             Action::Sub(x) => self.0 -= x,
        ///             Action::Mul(x) => self.0 *= x,
        ///             Action::Div(x) => self.0 /= x,
        ///         }
        ///     }
        /// }
        ///
        /// #[tokio::main]
        /// async fn main() -> Result<(), Box<dyn Error>> {
        ///     let store = Store::new(
        ///         Calculator(0),
        ///         AsyncReactor(sink::unfold((), |_, state: Calculator| async move {
        ///             writeln!(&mut io::stdout(), "{}", state.0)
        ///         })),
        ///     );
        ///
        ///     // Process incoming actions on a background task.
        ///     let (task, mut dispatcher) = store.into_task();
        ///     let handle = spawn(task);
        ///
        ///     dispatcher.dispatch(Action::Add(5))?; // asynchronously prints "5" to stdout
        ///     dispatcher.dispatch(Action::Mul(3))?; // asynchronously prints "15" to stdout
        ///     dispatcher.dispatch(Action::Sub(1))?; // asynchronously prints "14" to stdout
        ///     dispatcher.dispatch(Action::Div(7))?; // asynchronously prints "2" to stdout
        ///
        ///     // Closing the asynchronous dispatcher signals to the background task that
        ///     // it can terminate once all pending actions have been processed.
        ///     dispatcher.close().await?;
        ///
        ///     // Wait for the background task to terminate.
        ///     handle.await??;
        ///
        ///     Ok(())
        /// }
        /// ```
        pub fn into_task<A, E>(
            self,
        ) -> (
            impl Future<Output = Result<(), E>>,
            impl Dispatcher<A, Output = Result<(), DispatchError>>
                + Sink<A, Error = DispatchError>
                + Clone,
        )
        where
            Self: Sink<A, Error = E>,
        {
            let (tx, rx) = channel(0);
            let future = rx.map(Ok).forward(self);
            let dispatcher = AsyncDispatcher(tx.sink_map_err(|_| DispatchError::Terminated));

            (future, dispatcher)
        }
    }
}

#[cfg(feature = "async")]
pub use sink::*;

#[cfg(test)]
mod tests {
    use super::*;
    use crate::reactor::MockReactor;
    use crate::reducer::MockReducer;
    use mockall::predicate::*;
    use std::ops::Deref;
    use test_strategy::proptest;

    #[cfg(feature = "async")]
    use crate::reactor::AsyncReactor;

    #[cfg(feature = "async")]
    use futures::SinkExt;

    #[cfg(feature = "async")]
    use tokio::runtime;

    #[cfg(feature = "async")]
    use std::thread::yield_now;

    #[proptest]
    fn default() {
        Store::<(), ()>::default();
    }

    #[proptest]
    fn deref(state: u8) {
        let store = Store::new(state, ());
        assert_eq!(store.deref(), &store.state);
    }

    #[proptest]
    fn new(state: u8, reactor: u8) {
        let store = Store::new(state, reactor);
        assert_eq!(store.state, state);
        assert_eq!(store.reactor, reactor);
    }

    #[proptest]
    fn clone(a: usize, b: usize) {
        let mut reducer = MockReducer::<()>::new();
        reducer.expect_id().return_const(a);
        reducer.expect_clone().once().returning(move || {
            let mut mock = MockReducer::new();
            mock.expect_id().return_const(a);
            mock
        });

        let mut reactor = MockReactor::<(), ()>::new();
        reactor.expect_id().return_const(b);
        reactor.expect_clone().once().returning(move || {
            let mut mock = MockReactor::new();
            mock.expect_id().return_const(b);
            mock
        });

        #[allow(clippy::redundant_clone)]
        let store = Store::new(reducer, reactor).clone();

        assert_eq!(store.state.id(), a);
        assert_eq!(store.reactor.id(), b);
    }

    #[proptest]
    fn subscribe(a: usize, b: usize) {
        let mut mock = MockReactor::<(), ()>::new();
        mock.expect_id().return_const(a);

        let mut store = Store::new((), mock);

        let mut mock = MockReactor::<_, ()>::new();
        mock.expect_id().return_const(b);

        assert_eq!(store.subscribe(mock).id(), a);
        assert_eq!(store.reactor.id(), b);
    }

    #[proptest]
    fn dispatch(action: u8, result: Result<(), u8>, id: usize) {
        let mut reducer = MockReducer::new();
        reducer.expect_id().return_const(id);
        reducer.expect_clone().never();
        reducer
            .expect_reduce()
            .with(eq(action))
            .once()
            .return_const(());

        let mut reactor = MockReactor::new();
        reactor
            .expect_react()
            .with(function(move |x: &MockReducer<_>| x.id() == id))
            .once()
            .return_const(result);

        let mut store = Store::new(reducer, reactor);
        assert_eq!(Dispatcher::dispatch(&mut store, action), result);
    }

    #[cfg(feature = "async")]
    #[proptest]
    fn sink(action: u8, result: Result<(), u8>, id: usize) {
        let rt = runtime::Builder::new_multi_thread().build()?;
        let mut reducer = MockReducer::new();
        reducer.expect_id().return_const(id);
        reducer.expect_clone().returning(move || {
            let mut mock = MockReducer::new();
            mock.expect_id().return_const(id);
            mock.expect_reduce().never();
            mock.expect_clone().never();
            mock
        });

        reducer
            .expect_reduce()
            .with(eq(action))
            .once()
            .return_const(());

        let mut reactor = MockReactor::new();
        reactor
            .expect_react()
            .with(function(move |x: &MockReducer<_>| x.id() == id))
            .once()
            .return_const(result);

        let mut store = Store::new(reducer, AsyncReactor(reactor));
        assert_eq!(rt.block_on(store.send(action)), result);
        assert_eq!(rt.block_on(store.close()), Ok(()));
    }

    #[cfg(feature = "async")]
    #[proptest]
    fn task(action: u8, result: Result<(), u8>, id: usize) {
        let rt = runtime::Builder::new_multi_thread().build()?;
        let mut reducer = MockReducer::new();
        reducer.expect_id().return_const(id);
        reducer.expect_clone().returning(move || {
            let mut mock = MockReducer::new();
            mock.expect_id().return_const(id);
            mock.expect_reduce().never();
            mock.expect_clone().never();
            mock
        });

        reducer
            .expect_reduce()
            .with(eq(action))
            .once()
            .return_const(());

        let mut reactor = MockReactor::new();
        reactor
            .expect_react()
            .with(function(move |x: &MockReducer<_>| x.id() == id))
            .once()
            .return_const(result);

        let store = Store::new(reducer, AsyncReactor(reactor));
        let (task, mut dispatcher) = store.into_task();

        let handle = rt.spawn(task);

        assert_eq!(dispatcher.dispatch(action), Ok(()));
        assert_eq!(rt.block_on(dispatcher.close()), Ok(()));
        assert_eq!(rt.block_on(handle)?, result);
    }

    #[cfg(feature = "async")]
    #[proptest]
    fn error(action: u8, error: u8, id: usize) {
        let rt = runtime::Builder::new_multi_thread().build()?;
        let mut reducer = MockReducer::new();
        reducer.expect_id().return_const(id);
        reducer.expect_clone().returning(move || {
            let mut mock = MockReducer::new();
            mock.expect_id().return_const(id);
            mock.expect_reduce().never();
            mock.expect_clone().never();
            mock
        });

        reducer
            .expect_reduce()
            .with(eq(action))
            .once()
            .return_const(());

        let mut reactor = MockReactor::new();
        reactor
            .expect_react()
            .with(function(move |x: &MockReducer<_>| x.id() == id))
            .once()
            .return_const(Err(error));

        let store = Store::new(reducer, AsyncReactor(reactor));
        let (task, mut dispatcher) = store.into_task();

        let handle = rt.spawn(task);

        assert_eq!(dispatcher.dispatch(action), Ok(()));

        loop {
            match dispatcher.dispatch(action) {
                // Wait for the information to propagate,
                // that the spawned task has terminated.
                Ok(()) => yield_now(),
                Err(e) => break assert_eq!(e, DispatchError::Terminated),
            }
        }

        assert_eq!(rt.block_on(handle)?, Err(error));
    }
}