1use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
63use futures::future::FutureExt;
64use futures::ready;
65use futures::sink::SinkExt;
66use futures::stream::StreamExt;
67use futures::{AsyncRead, AsyncWrite};
68use futures_timer::Delay;
69use std::io::{Error, ErrorKind, Result};
70use std::pin::Pin;
71use std::sync::{Arc, Mutex};
72use std::task::{Context, Poll, Waker};
73use std::time::Duration;
74
75pub struct Endpoint {
76 sender: UnboundedSender<Item>,
77
78 receiver: UnboundedReceiver<Item>,
79 next_item: Option<Item>,
80
81 shared_send: Arc<Mutex<Shared>>,
82 shared_receive: Arc<Mutex<Shared>>,
83
84 delay: Duration,
85 capacity: usize,
86}
87
88pub fn new_constrained_connection(
94 bandwidth_bits_per_second: u64,
95 rtt: Duration,
96) -> (Endpoint, Endpoint) {
97 let single_direction_capacity_bytes =
98 single_direction_capacity_bytes(bandwidth_bits_per_second, rtt);
99 assert!(single_direction_capacity_bytes > 0);
100 let single_direction_delay = rtt / 2;
101
102 let (a_to_b_sender, a_to_b_receiver) = unbounded();
103 let (b_to_a_sender, b_to_a_receiver) = unbounded();
104
105 let a_to_b_shared = Arc::new(Mutex::new(Default::default()));
106 let b_to_a_shared = Arc::new(Mutex::new(Default::default()));
107
108 let a = Endpoint {
109 sender: a_to_b_sender,
110 receiver: b_to_a_receiver,
111 next_item: None,
112
113 shared_send: a_to_b_shared.clone(),
114 shared_receive: b_to_a_shared.clone(),
115
116 delay: single_direction_delay,
117 capacity: single_direction_capacity_bytes,
118 };
119
120 let b = Endpoint {
121 sender: b_to_a_sender,
122 receiver: a_to_b_receiver,
123 next_item: None,
124
125 shared_send: b_to_a_shared,
126 shared_receive: a_to_b_shared,
127
128 delay: single_direction_delay,
129 capacity: single_direction_capacity_bytes,
130 };
131
132 (a, b)
133}
134
135pub fn new_unconstrained_connection() -> (Endpoint, Endpoint) {
136 let (a_to_b_sender, a_to_b_receiver) = unbounded();
137 let (b_to_a_sender, b_to_a_receiver) = unbounded();
138
139 let a_to_b_shared = Arc::new(Mutex::new(Default::default()));
140 let b_to_a_shared = Arc::new(Mutex::new(Default::default()));
141
142 let a = Endpoint {
143 sender: a_to_b_sender,
144 receiver: b_to_a_receiver,
145 next_item: None,
146
147 shared_send: a_to_b_shared.clone(),
148 shared_receive: b_to_a_shared.clone(),
149
150 delay: Duration::from_secs(0),
151 capacity: std::usize::MAX,
152 };
153
154 let b = Endpoint {
155 sender: b_to_a_sender,
156 receiver: a_to_b_receiver,
157 next_item: None,
158
159 shared_send: b_to_a_shared,
160 shared_receive: a_to_b_shared,
161
162 delay: Duration::from_secs(0),
163 capacity: std::usize::MAX,
164 };
165
166 (a, b)
167}
168
169struct Item {
170 data: Vec<u8>,
171 delay: Delay,
172}
173
174impl Unpin for Endpoint {}
175
176impl AsyncRead for Endpoint {
177 fn poll_read(
178 mut self: Pin<&mut Self>,
179 cx: &mut Context<'_>,
180 buf: &mut [u8],
181 ) -> Poll<Result<usize>> {
182 let item = match self.next_item.as_mut() {
183 Some(item) => item,
184 None => match ready!(self.receiver.poll_next_unpin(cx)) {
185 Some(item) => {
186 self.next_item = Some(item);
187 self.next_item.as_mut().unwrap()
188 }
189 None => {
190 return Poll::Ready(Ok(0));
191 }
192 },
193 };
194
195 ready!(item.delay.poll_unpin(cx));
196
197 let n = std::cmp::min(buf.len(), item.data.len());
198
199 buf[0..n].copy_from_slice(&item.data[0..n]);
200
201 if n < item.data.len() {
202 item.data = item.data.split_off(n);
203 } else {
204 self.next_item.take().unwrap();
205 }
206
207 let mut shared = self.shared_receive.lock().unwrap();
208 if let Some(waker) = shared.waker_write.take() {
209 waker.wake();
210 }
211
212 debug_assert!(shared.size >= n);
213 shared.size -= n;
214
215 Poll::Ready(Ok(n))
216 }
217}
218
219impl AsyncWrite for Endpoint {
220 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
221 let mut shared = self.shared_send.lock().unwrap();
222 let n = std::cmp::min(self.capacity - shared.size, buf.len());
223 if n == 0 {
224 shared.waker_write = Some(cx.waker().clone());
225 return Poll::Pending;
226 }
227
228 self.sender
229 .unbounded_send(Item {
230 data: buf[0..n].to_vec(),
231 delay: Delay::new(self.delay),
232 })
233 .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))?;
234
235 shared.size += n;
236
237 Poll::Ready(Ok(n))
238 }
239
240 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
241 ready!(self.sender.poll_flush_unpin(cx)).unwrap();
242 Poll::Ready(Ok(()))
243 }
244
245 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
246 ready!(self.sender.poll_close_unpin(cx)).unwrap();
247 Poll::Ready(Ok(()))
248 }
249}
250
251#[derive(Default)]
252struct Shared {
253 waker_write: Option<Waker>,
254 size: usize,
255}
256
257fn single_direction_capacity_bytes(bandwidth_bits_per_second: u64, rtt: Duration) -> usize {
258 let bandwidth_delay_product: u128 =
259 bandwidth_bits_per_second as u128 * rtt.as_micros() / 1_000_000u128 / 8;
260 (bandwidth_delay_product / 2) as usize
261}
262
263pub mod samples {
266 use super::{new_constrained_connection, Endpoint};
267 use std::time::Duration;
268
269 pub fn satellite_network() -> (u64, Duration, (Endpoint, Endpoint)) {
270 let bandwidth = 512_000;
271 let rtt = Duration::from_millis(900);
272 let connections = new_constrained_connection(bandwidth, rtt);
273
274 (bandwidth, rtt, connections)
275 }
276
277 pub fn residential_dsl() -> (u64, Duration, (Endpoint, Endpoint)) {
278 let bandwidth = 2_000_000;
279 let rtt = Duration::from_millis(50);
280 let connections = new_constrained_connection(bandwidth, rtt);
281
282 (bandwidth, rtt, connections)
283 }
284
285 pub fn mobile_hsdpa() -> (u64, Duration, (Endpoint, Endpoint)) {
286 let bandwidth = 6_000_000;
287 let rtt = Duration::from_millis(100);
288 let connections = new_constrained_connection(bandwidth, rtt);
289
290 (bandwidth, rtt, connections)
291 }
292
293 pub fn residential_adsl2() -> (u64, Duration, (Endpoint, Endpoint)) {
294 let bandwidth = 20_000_000;
295 let rtt = Duration::from_millis(50);
296 let connections = new_constrained_connection(bandwidth, rtt);
297
298 (bandwidth, rtt, connections)
299 }
300
301 pub fn residential_cable_internet() -> (u64, Duration, (Endpoint, Endpoint)) {
302 let bandwidth = 200_000_000;
303 let rtt = Duration::from_millis(20);
304 let connections = new_constrained_connection(bandwidth, rtt);
305
306 (bandwidth, rtt, connections)
307 }
308
309 pub fn gbit_lan() -> (u64, Duration, (Endpoint, Endpoint)) {
310 let bandwidth = 1_000_000_000;
311 let rtt = Duration::from_micros(100);
312 let connections = new_constrained_connection(bandwidth, rtt);
313
314 (bandwidth, rtt, connections)
315 }
316
317 pub fn high_speed_terrestiral_network() -> (u64, Duration, (Endpoint, Endpoint)) {
318 let bandwidth = 1_000_000_000;
319 let rtt = Duration::from_millis(1);
320 let connections = new_constrained_connection(bandwidth, rtt);
321
322 (bandwidth, rtt, connections)
323 }
324
325 pub fn ultra_high_speed_lan() -> (u64, Duration, (Endpoint, Endpoint)) {
326 let bandwidth = 100_000_000_000;
327 let rtt = Duration::from_micros(30);
328 let connections = new_constrained_connection(bandwidth, rtt);
329
330 (bandwidth, rtt, connections)
331 }
332
333 pub fn iter_all(
334 ) -> impl Iterator<Item = (String, fn() -> (u64, Duration, (Endpoint, Endpoint)))> {
335 vec![
336 (
337 "Satellite Network ".to_string(),
338 satellite_network as fn() -> (u64, Duration, (Endpoint, Endpoint)),
339 ),
340 (
341 "Residential DSL ".to_string(),
342 residential_dsl as fn() -> (u64, Duration, (Endpoint, Endpoint)),
343 ),
344 (
345 "Mobile HSDPA ".to_string(),
346 mobile_hsdpa as fn() -> (u64, Duration, (Endpoint, Endpoint)),
347 ),
348 (
349 "Residential ADSL2+ ".to_string(),
350 residential_adsl2 as fn() -> (u64, Duration, (Endpoint, Endpoint)),
351 ),
352 (
353 "Residential Cable Internet".to_string(),
354 residential_cable_internet as fn() -> (u64, Duration, (Endpoint, Endpoint)),
355 ),
356 (
357 "GBit LAN ".to_string(),
358 gbit_lan as fn() -> (u64, Duration, (Endpoint, Endpoint)),
359 ),
360 (
361 "High Speed Terrestiral Net".to_string(),
362 high_speed_terrestiral_network as fn() -> (u64, Duration, (Endpoint, Endpoint)),
363 ),
364 (
365 "Ultra High Speed LAN ".to_string(),
366 ultra_high_speed_lan as fn() -> (u64, Duration, (Endpoint, Endpoint)),
367 ),
368 ]
369 .into_iter()
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use futures::task::Spawn;
377 use futures::{AsyncReadExt, AsyncWriteExt};
378 use quickcheck::{Gen, QuickCheck, TestResult};
379 use std::time::Instant;
380
381 #[test]
382 fn quickcheck() {
383 fn prop(msg: Vec<u8>, bandwidth: u32, rtt: u64) -> TestResult {
384 let start = Instant::now();
385
386 let bandwidth = bandwidth % 1024 * 1024 * 1024; let rtt = Duration::from_micros(rtt % Duration::from_secs(1).as_millis() as u64); if bandwidth == 0
390 || rtt == Duration::from_micros(1)
391 || msg.is_empty()
392 || single_direction_capacity_bytes(bandwidth as u64, rtt) < 1
393 {
394 return TestResult::discard();
395 }
396
397 let (mut a, mut b) = new_constrained_connection(bandwidth as u64, rtt);
398
399 let mut pool = futures::executor::LocalPool::new();
400
401 let msg_clone = msg.clone();
402 pool.spawner()
403 .spawn_obj(
404 async move {
405 a.write_all(&msg_clone).await.unwrap();
406 }
407 .boxed()
408 .into(),
409 )
410 .unwrap();
411
412 pool.run_until(async {
413 let mut received_msg = Vec::new();
414 b.read_to_end(&mut received_msg).await.unwrap();
415
416 assert_eq!(msg, received_msg);
417 });
418
419 let duration = start.elapsed();
420
421 println!(
422 "bandwidth {} KiB/s, rtt {}s duration {}s, msg len {} KiB, percentage {}",
423 bandwidth / 1024,
424 rtt.as_secs_f64(),
425 duration.as_secs_f64(),
426 msg.len() / 1024 * 8,
427 (bandwidth as f64 * (duration.as_secs_f64() - rtt.as_secs_f64() / 2.0))
428 / (msg.len() * 8) as f64
429 );
430
431 TestResult::passed()
432 }
433
434 QuickCheck::new()
435 .gen(Gen::new(1_000_000))
436 .quickcheck(prop as fn(_, _, _) -> _)
437 }
438}