1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
extern crate desync;
extern crate futures;
use desync::*;
use futures::stream;
use futures::future;
use futures::executor;
use futures::sink::{SinkExt};
use futures::stream::{StreamExt};
use futures::channel::mpsc;
use futures::prelude::*;
use std::sync::*;
use std::thread;
use std::time::Duration;
#[test]
fn pipe_in_simple_stream() {
// Create a stream
let stream = vec![1, 2, 3];
let stream = stream::iter(stream);
// Create an object for the stream to be piped into
let obj = Arc::new(Desync::new(vec![]));
// Pipe the stream into the object
pipe_in(Arc::clone(&obj), stream, |core: &mut Vec<Result<i32, ()>>, item| { core.push(Ok(item)); Box::pin(future::ready(())) });
// Delay to allow the messages to be processed on the stream
thread::sleep(Duration::from_millis(10));
// Once the stream is drained, the core should contain Ok(1), Ok(2), Ok(3)
assert!(obj.sync(|core| core.clone()) == vec![Ok(1), Ok(2), Ok(3)])
}
#[test]
fn pipe_in_mpsc_receiver() {
// Create a channel to send to the object
let (mut sender, receiver) = mpsc::channel(0);
// Create an object
let obj = Arc::new(Desync::new(vec![]));
// Add anything received to the vector via a pipe
pipe_in(Arc::clone(&obj), receiver, |core, item| { core.push(item); Box::pin(future::ready(())) });
// Initially empty
assert!(obj.sync(|core| core.clone()) == vec![]);
// Send some values
let send_values = async {
sender.send(1).await.unwrap();
sender.send(2).await.unwrap();
};
executor::block_on(send_values);
// Delay to allow the messages to be processed on the stream
// TODO: fix so this isn't needed. This happens because there's a race between when 'poll'
// is called in the pipe and the 'async' call
thread::sleep(Duration::from_millis(20));
// Should be available on the core
assert!(obj.sync(|core| core.clone()) == vec![1, 2]);
}
#[test]
fn pipe_through() {
// Create a channel we'll use to send data to the pipe
let (mut sender, receiver) = mpsc::channel(10);
// Create an object to pipe through
let obj = Arc::new(Desync::new(1));
// Create a pipe that adds values from the stream to the value in the object
let mut pipe_out = pipe(Arc::clone(&obj), receiver, |core, item| future::ready(item + *core).boxed());
// Start things running
executor::block_on(async {
sender.send(2).await.unwrap();
let next_val = pipe_out.next().await;
assert!(next_val == Some(3), "{:?} != Some(3)", next_val);
sender.send(42).await.unwrap();
let next_val = pipe_out.next().await;
assert!(next_val == Some(43), "{:?} != Some(43)", next_val);
// It is possible for a poll to already be pending again at this point, which may race to read the value we set later on, so we synchronise to ensure they are all processed
obj.sync(|_| { });
// Changing the value should change the output
obj.desync(|core| *core = 2);
sender.send(44).await.unwrap();
let next_val = pipe_out.next().await;
assert!(next_val == Some(46), "{:?} != Some(46)", next_val);
});
}
#[test]
fn pipe_through_1000() {
for _ in 0..1000 {
// Create a channel we'll use to send data to the pipe
let (mut sender, receiver) = mpsc::channel(10);
// Create an object to pipe through
let obj = Arc::new(Desync::new(1));
// Create a pipe that adds values from the stream to the value in the object
let mut pipe_out = pipe(Arc::clone(&obj), receiver, |core, item| future::ready(item + *core).boxed());
// Start things running
executor::block_on(async {
sender.send(2).await.unwrap();
let next_val = pipe_out.next().await;
assert!(next_val == Some(3), "{:?} != Some(3)", next_val);
sender.send(42).await.unwrap();
let next_val = pipe_out.next().await;
assert!(next_val == Some(43), "{:?} != Some(43)", next_val);
// It is possible for a poll to already be pending again at this point, which may race to read the value we set later on, so we synchronise to ensure they are all processed
obj.sync(|_| { });
// Changing the value should change the output
obj.desync(|core| *core = 2);
sender.send(44).await.unwrap();
let next_val = pipe_out.next().await;
assert!(next_val == Some(46), "{:?} != Some(46)", next_val);
});
}
}
#[test]
fn pipe_through_stream_closes() {
let mut pipe_out_with_closed_stream = {
// Create a channel we'll use to send data to the pipe
let (mut sender, receiver) = mpsc::channel(10);
// Create an object to pipe through
let obj = Arc::new(Desync::new(1));
// Create a pipe that adds values from the stream to the value in the object
let mut pipe_out = pipe(Arc::clone(&obj), receiver, |core, item: i32| future::ready(item + *core).boxed());
// Start things running
executor::block_on(async {
sender.send(2).await.unwrap();
assert!(pipe_out.next().await == Some(3))
});
pipe_out
};
executor::block_on(async {
// The sender is now closed (the sender and receiver are dropped after the block above), so the pipe should close too
assert!(pipe_out_with_closed_stream.next().await == None);
});
}
#[test]
fn pipe_through_produces_backpressure() {
// Create a channel we'll use to send data to the pipe
let (mut sender, receiver) = mpsc::channel(0);
// Create an object to pipe through
let obj = Arc::new(Desync::new(1));
// Create a pipe that adds values from the stream to the value in the object
let mut pipe_out = pipe(Arc::clone(&obj), receiver, |core, item: i32| future::ready(item + *core).boxed());
// Set the backpressure depth to 3
pipe_out.set_backpressure_depth(3);
// Start things running. We never read from this pipe here
executor::block_on(async {
// Send 3 events to the pipe. Wait a bit between them to allow for processing time
for _x in 0..3 {
let mut iter = 0;
let succeeded = loop {
iter = iter + 1;
if iter > 1000 { break false; }
if sender.try_send(1) == Ok(()) {
break true;
}
// The wait here allows the message to flow through to the pipe (if we call try_send again before the pipe has a chance to accept the input)
thread::sleep(Duration::from_millis(5));
};
assert!(succeeded, "Could not queue next item");
}
// This will stick in the channel (pipe should not be accepting more input)
let mut iter = 0;
let succeeded = loop {
iter = iter + 1;
if iter > 1000 { break false; }
if sender.try_send(2) == Ok(()) {
break true;
}
thread::sleep(Duration::from_millis(5));
};
assert!(succeeded, "Could not queue final item");
thread::sleep(Duration::from_millis(5));
// Channel will push back on this one
let channel_full = sender.try_send(3);
assert!(channel_full.is_err());
assert!(channel_full.unwrap_err().is_full());
});
}