1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::reactor::*;
use std::sync::mpsc::{SendError, Sender};
impl<S> Reactor<S> for Sender<S>
where
S: Clone,
{
type Output = Result<(), SendError<S>>;
fn react(&self, state: &S) -> Self::Output {
self.send(state.clone())
}
}
#[cfg(feature = "async")]
use futures::{executor::block_on, sink::SinkExt};
#[cfg(feature = "async")]
use futures::channel::mpsc::{SendError as AsyncSendError, Sender as AsyncSender};
#[cfg(feature = "async")]
impl<S> Reactor<S> for AsyncSender<S>
where
S: Clone,
{
type Output = Result<(), AsyncSendError>;
fn react(&self, state: &S) -> Self::Output {
block_on(self.clone().send(state.clone()))
}
}
#[cfg(feature = "async")]
use futures::channel::mpsc::{TrySendError, UnboundedSender};
#[cfg(feature = "async")]
impl<S> Reactor<S> for UnboundedSender<S>
where
S: Clone,
{
type Output = Result<(), TrySendError<S>>;
fn react(&self, state: &S) -> Self::Output {
self.unbounded_send(state.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::*;
#[cfg(feature = "async")]
use futures::stream::StreamExt;
proptest! {
#[test]
fn std(states: Vec<u8>) {
let (tx, rx) = std::sync::mpsc::channel();
for state in &states {
assert_eq!(tx.react(state), Ok(()));
}
assert_eq!(rx.iter().take(states.len()).collect::<Vec<_>>(), states);
drop(rx);
assert_eq!(tx.react(&0), Err(SendError(0)));
}
}
proptest! {
#[cfg(feature = "async")]
#[test]
fn sink(mut states: Vec<u8>) {
let (tx, mut rx) = futures::channel::mpsc::channel(0);
for state in &states {
assert_eq!(tx.react(state), Ok(()));
}
for state in states {
assert_eq!(block_on(rx.next()), Some(state));
}
drop(rx);
assert_ne!(tx.react(&0), Ok(()));
}
}
proptest! {
#[cfg(feature = "async")]
#[test]
fn unbounded(mut states: Vec<u8>) {
let (tx, mut rx) = futures::channel::mpsc::unbounded();
for state in &states {
assert_eq!(tx.react(state), Ok(()));
}
for state in states {
assert_eq!(block_on(rx.next()), Some(state));
}
drop(rx);
assert_ne!(tx.react(&0), Ok(()));
}
}
}