1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::reactor::*;
use std::sync::mpsc::{SendError, Sender};

/// Turns [`std::sync::mpsc::Sender`] into a [`Reactor`].
impl<S> Reactor<S> for Sender<S>
where
    S: Clone,
{
    type Output = Result<(), SendError<S>>;

    fn react(&self, state: &S) -> Self::Output {
        self.send(state.clone())
    }
}

#[cfg(feature = "async")]
use futures::{executor::block_on, sink::SinkExt};

#[cfg(feature = "async")]
use futures::channel::mpsc::{SendError as AsyncSendError, Sender as AsyncSender};

/// Turns [`futures::channel::mpsc::Sender`] into a [`Reactor`] (requires [`async`]).
///
/// [`async`]: index.html#experimental-features
#[cfg(feature = "async")]
impl<S> Reactor<S> for AsyncSender<S>
where
    S: Clone,
{
    type Output = Result<(), AsyncSendError>;

    fn react(&self, state: &S) -> Self::Output {
        block_on(self.clone().send(state.clone()))
    }
}

#[cfg(feature = "async")]
use futures::channel::mpsc::{TrySendError, UnboundedSender};

/// Turns [`futures::channel::mpsc::UnboundedSender`] into a [`Reactor`] (requires [`async`]).
///
/// [`async`]: index.html#experimental-features
#[cfg(feature = "async")]
impl<S> Reactor<S> for UnboundedSender<S>
where
    S: Clone,
{
    type Output = Result<(), TrySendError<S>>;

    fn react(&self, state: &S) -> Self::Output {
        self.unbounded_send(state.clone())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use proptest::*;

    #[cfg(feature = "async")]
    use futures::stream::StreamExt;

    proptest! {
        #[test]
        fn std(states: Vec<u8>) {
            let (tx, rx) = std::sync::mpsc::channel();

            for state in &states {
                assert_eq!(tx.react(state), Ok(()));
            }

            assert_eq!(rx.iter().take(states.len()).collect::<Vec<_>>(), states);

            // hang up tx
            drop(rx);

            assert_eq!(tx.react(&0), Err(SendError(0)));
        }
    }

    proptest! {
        #[cfg(feature = "async")]
        #[test]
        fn sink(mut states: Vec<u8>) {
            let (tx, mut rx) = futures::channel::mpsc::channel(0);

            for state in &states {
                assert_eq!(tx.react(state), Ok(()));
            }

            for state in states {
                assert_eq!(block_on(rx.next()), Some(state));
            }

            // hang up tx
            drop(rx);

            assert_ne!(tx.react(&0), Ok(()));
        }
    }

    proptest! {
        #[cfg(feature = "async")]
        #[test]
        fn unbounded(mut states: Vec<u8>) {
            let (tx, mut rx) = futures::channel::mpsc::unbounded();

            for state in &states {
                assert_eq!(tx.react(state), Ok(()));
            }

            for state in states {
                assert_eq!(block_on(rx.next()), Some(state));
            }

            // hang up tx
            drop(rx);

            assert_ne!(tx.react(&0), Ok(()));
        }
    }
}