constrained_connection/
lib.rs

1//! Simulate constrained network connections.
2//!
3//! Can be used to benchmark networking logic build on top of a stream oriented
4//! connection (e.g. TCP). Create a connection by specifying its bandwidth as
5//! well as its round trip time (delay). The connection will delay each bytes
6//! chunk by the configured delay and allow at most [bandwidth-delay
7//! product](https://en.wikipedia.org/wiki/Bandwidth-delay_product) number of
8//! bytes on the *wire* enforcing backpressure.
9//!
10//! ```
11//! # use constrained_connection::{Endpoint, new_constrained_connection};
12//! # use futures::task::Spawn;
13//! # use futures::{AsyncReadExt, AsyncWriteExt};
14//! # use std::time::Duration;
15//! # use std::time::Instant;
16//! # use futures::future::FutureExt;
17//! let msg = vec![0; 10 * 1024 * 1024];
18//! let msg_clone = msg.clone();
19//! let start = Instant::now();
20//! let mut pool = futures::executor::LocalPool::new();
21//!
22//! let bandwidth = 1_000_000_000;
23//! let rtt = Duration::from_micros(100);
24//! let (mut a, mut b) = new_constrained_connection(bandwidth, rtt);
25//!
26//! pool.spawner().spawn_obj(async move {
27//!     a.write_all(&msg_clone).await.unwrap();
28//! }.boxed().into()).unwrap();
29//!
30//! pool.run_until(async {
31//!     let mut received_msg = Vec::new();
32//!     b.read_to_end(&mut received_msg).await.unwrap();
33//!
34//!     assert_eq!(msg, received_msg);
35//! });
36//!
37//! let duration = start.elapsed();
38//!
39//! println!(
40//!     "Bandwidth {} KiB/s, RTT {:.5} s, Payload length {} KiB, duration {:.5} s",
41//!     bandwidth / 1024, rtt.as_secs_f64(), msg.len() / 1024, duration.as_secs_f64(),
42//! );
43//! ```
44//!
45//! For now, as the library is not properly optimized, you can not simulate high
46//! speed networks. Execute the `examples/accuracy.rs` binary for details.
47//!
48//! ```bash
49//! $ cargo run --example accuracy --release
50//!
51//! Name                            Bandwidth       RTT             Payload         Duration        Acurracy
52//! Satellite Network               500 KiB/s       0.90000 s       10240 KiB       164.49 s        1.00 %
53//! Residential DSL                 1953 KiB/s      0.05000 s       10240 KiB       42.97 s         1.02 %
54//! Mobile HSDPA                    5859 KiB/s      0.10000 s       10240 KiB       14.19 s         1.01 %
55//! Residential ADSL2+              19531 KiB/s     0.05000 s       10240 KiB       4.33 s          1.03 %
56//! Residential Cable Internet      195312 KiB/s    0.02000 s       10240 KiB       0.46 s          1.07 %
57//! GBit LAN                        976562 KiB/s    0.00010 s       10240 KiB       0.26 s          3.16 %
58//! High Speed Terrestiral Net      976562 KiB/s    0.00100 s       10240 KiB       0.13 s          1.56 %
59//! Ultra High Speed LAN            97656250 KiB/s  0.00003 s       10240 KiB       0.01 s          16.08 %
60//! ```
61
62use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
63use futures::future::FutureExt;
64use futures::ready;
65use futures::sink::SinkExt;
66use futures::stream::StreamExt;
67use futures::{AsyncRead, AsyncWrite};
68use futures_timer::Delay;
69use std::io::{Error, ErrorKind, Result};
70use std::pin::Pin;
71use std::sync::{Arc, Mutex};
72use std::task::{Context, Poll, Waker};
73use std::time::Duration;
74
75pub struct Endpoint {
76    sender: UnboundedSender<Item>,
77
78    receiver: UnboundedReceiver<Item>,
79    next_item: Option<Item>,
80
81    shared_send: Arc<Mutex<Shared>>,
82    shared_receive: Arc<Mutex<Shared>>,
83
84    delay: Duration,
85    capacity: usize,
86}
87
88/// Create a new [`Endpoint`] pair.
89///
90/// `bandwidth` being the bandwidth in bits per second.
91///
92/// `rtt` being the round trip time.
93pub fn new_constrained_connection(
94    bandwidth_bits_per_second: u64,
95    rtt: Duration,
96) -> (Endpoint, Endpoint) {
97    let single_direction_capacity_bytes =
98        single_direction_capacity_bytes(bandwidth_bits_per_second, rtt);
99    assert!(single_direction_capacity_bytes > 0);
100    let single_direction_delay = rtt / 2;
101
102    let (a_to_b_sender, a_to_b_receiver) = unbounded();
103    let (b_to_a_sender, b_to_a_receiver) = unbounded();
104
105    let a_to_b_shared = Arc::new(Mutex::new(Default::default()));
106    let b_to_a_shared = Arc::new(Mutex::new(Default::default()));
107
108    let a = Endpoint {
109        sender: a_to_b_sender,
110        receiver: b_to_a_receiver,
111        next_item: None,
112
113        shared_send: a_to_b_shared.clone(),
114        shared_receive: b_to_a_shared.clone(),
115
116        delay: single_direction_delay,
117        capacity: single_direction_capacity_bytes,
118    };
119
120    let b = Endpoint {
121        sender: b_to_a_sender,
122        receiver: a_to_b_receiver,
123        next_item: None,
124
125        shared_send: b_to_a_shared,
126        shared_receive: a_to_b_shared,
127
128        delay: single_direction_delay,
129        capacity: single_direction_capacity_bytes,
130    };
131
132    (a, b)
133}
134
135pub fn new_unconstrained_connection() -> (Endpoint, Endpoint) {
136    let (a_to_b_sender, a_to_b_receiver) = unbounded();
137    let (b_to_a_sender, b_to_a_receiver) = unbounded();
138
139    let a_to_b_shared = Arc::new(Mutex::new(Default::default()));
140    let b_to_a_shared = Arc::new(Mutex::new(Default::default()));
141
142    let a = Endpoint {
143        sender: a_to_b_sender,
144        receiver: b_to_a_receiver,
145        next_item: None,
146
147        shared_send: a_to_b_shared.clone(),
148        shared_receive: b_to_a_shared.clone(),
149
150        delay: Duration::from_secs(0),
151        capacity: std::usize::MAX,
152    };
153
154    let b = Endpoint {
155        sender: b_to_a_sender,
156        receiver: a_to_b_receiver,
157        next_item: None,
158
159        shared_send: b_to_a_shared,
160        shared_receive: a_to_b_shared,
161
162        delay: Duration::from_secs(0),
163        capacity: std::usize::MAX,
164    };
165
166    (a, b)
167}
168
169struct Item {
170    data: Vec<u8>,
171    delay: Delay,
172}
173
174impl Unpin for Endpoint {}
175
176impl AsyncRead for Endpoint {
177    fn poll_read(
178        mut self: Pin<&mut Self>,
179        cx: &mut Context<'_>,
180        buf: &mut [u8],
181    ) -> Poll<Result<usize>> {
182        let item = match self.next_item.as_mut() {
183            Some(item) => item,
184            None => match ready!(self.receiver.poll_next_unpin(cx)) {
185                Some(item) => {
186                    self.next_item = Some(item);
187                    self.next_item.as_mut().unwrap()
188                }
189                None => {
190                    return Poll::Ready(Ok(0));
191                }
192            },
193        };
194
195        ready!(item.delay.poll_unpin(cx));
196
197        let n = std::cmp::min(buf.len(), item.data.len());
198
199        buf[0..n].copy_from_slice(&item.data[0..n]);
200
201        if n < item.data.len() {
202            item.data = item.data.split_off(n);
203        } else {
204            self.next_item.take().unwrap();
205        }
206
207        let mut shared = self.shared_receive.lock().unwrap();
208        if let Some(waker) = shared.waker_write.take() {
209            waker.wake();
210        }
211
212        debug_assert!(shared.size >= n);
213        shared.size -= n;
214
215        Poll::Ready(Ok(n))
216    }
217}
218
219impl AsyncWrite for Endpoint {
220    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
221        let mut shared = self.shared_send.lock().unwrap();
222        let n = std::cmp::min(self.capacity - shared.size, buf.len());
223        if n == 0 {
224            shared.waker_write = Some(cx.waker().clone());
225            return Poll::Pending;
226        }
227
228        self.sender
229            .unbounded_send(Item {
230                data: buf[0..n].to_vec(),
231                delay: Delay::new(self.delay),
232            })
233            .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))?;
234
235        shared.size += n;
236
237        Poll::Ready(Ok(n))
238    }
239
240    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
241        ready!(self.sender.poll_flush_unpin(cx)).unwrap();
242        Poll::Ready(Ok(()))
243    }
244
245    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
246        ready!(self.sender.poll_close_unpin(cx)).unwrap();
247        Poll::Ready(Ok(()))
248    }
249}
250
251#[derive(Default)]
252struct Shared {
253    waker_write: Option<Waker>,
254    size: usize,
255}
256
257fn single_direction_capacity_bytes(bandwidth_bits_per_second: u64, rtt: Duration) -> usize {
258    let bandwidth_delay_product: u128 =
259        bandwidth_bits_per_second as u128 * rtt.as_micros() / 1_000_000u128 / 8;
260    (bandwidth_delay_product / 2) as usize
261}
262
263/// Samples based on numbers from
264/// https://en.wikipedia.org/wiki/Bandwidth-delay_product#examples
265pub mod samples {
266    use super::{new_constrained_connection, Endpoint};
267    use std::time::Duration;
268
269    pub fn satellite_network() -> (u64, Duration, (Endpoint, Endpoint)) {
270        let bandwidth = 512_000;
271        let rtt = Duration::from_millis(900);
272        let connections = new_constrained_connection(bandwidth, rtt);
273
274        (bandwidth, rtt, connections)
275    }
276
277    pub fn residential_dsl() -> (u64, Duration, (Endpoint, Endpoint)) {
278        let bandwidth = 2_000_000;
279        let rtt = Duration::from_millis(50);
280        let connections = new_constrained_connection(bandwidth, rtt);
281
282        (bandwidth, rtt, connections)
283    }
284
285    pub fn mobile_hsdpa() -> (u64, Duration, (Endpoint, Endpoint)) {
286        let bandwidth = 6_000_000;
287        let rtt = Duration::from_millis(100);
288        let connections = new_constrained_connection(bandwidth, rtt);
289
290        (bandwidth, rtt, connections)
291    }
292
293    pub fn residential_adsl2() -> (u64, Duration, (Endpoint, Endpoint)) {
294        let bandwidth = 20_000_000;
295        let rtt = Duration::from_millis(50);
296        let connections = new_constrained_connection(bandwidth, rtt);
297
298        (bandwidth, rtt, connections)
299    }
300
301    pub fn residential_cable_internet() -> (u64, Duration, (Endpoint, Endpoint)) {
302        let bandwidth = 200_000_000;
303        let rtt = Duration::from_millis(20);
304        let connections = new_constrained_connection(bandwidth, rtt);
305
306        (bandwidth, rtt, connections)
307    }
308
309    pub fn gbit_lan() -> (u64, Duration, (Endpoint, Endpoint)) {
310        let bandwidth = 1_000_000_000;
311        let rtt = Duration::from_micros(100);
312        let connections = new_constrained_connection(bandwidth, rtt);
313
314        (bandwidth, rtt, connections)
315    }
316
317    pub fn high_speed_terrestiral_network() -> (u64, Duration, (Endpoint, Endpoint)) {
318        let bandwidth = 1_000_000_000;
319        let rtt = Duration::from_millis(1);
320        let connections = new_constrained_connection(bandwidth, rtt);
321
322        (bandwidth, rtt, connections)
323    }
324
325    pub fn ultra_high_speed_lan() -> (u64, Duration, (Endpoint, Endpoint)) {
326        let bandwidth = 100_000_000_000;
327        let rtt = Duration::from_micros(30);
328        let connections = new_constrained_connection(bandwidth, rtt);
329
330        (bandwidth, rtt, connections)
331    }
332
333    pub fn iter_all(
334    ) -> impl Iterator<Item = (String, fn() -> (u64, Duration, (Endpoint, Endpoint)))> {
335        vec![
336            (
337                "Satellite Network         ".to_string(),
338                satellite_network as fn() -> (u64, Duration, (Endpoint, Endpoint)),
339            ),
340            (
341                "Residential DSL           ".to_string(),
342                residential_dsl as fn() -> (u64, Duration, (Endpoint, Endpoint)),
343            ),
344            (
345                "Mobile HSDPA              ".to_string(),
346                mobile_hsdpa as fn() -> (u64, Duration, (Endpoint, Endpoint)),
347            ),
348            (
349                "Residential ADSL2+        ".to_string(),
350                residential_adsl2 as fn() -> (u64, Duration, (Endpoint, Endpoint)),
351            ),
352            (
353                "Residential Cable Internet".to_string(),
354                residential_cable_internet as fn() -> (u64, Duration, (Endpoint, Endpoint)),
355            ),
356            (
357                "GBit LAN                 ".to_string(),
358                gbit_lan as fn() -> (u64, Duration, (Endpoint, Endpoint)),
359            ),
360            (
361                "High Speed Terrestiral Net".to_string(),
362                high_speed_terrestiral_network as fn() -> (u64, Duration, (Endpoint, Endpoint)),
363            ),
364            (
365                "Ultra High Speed LAN     ".to_string(),
366                ultra_high_speed_lan as fn() -> (u64, Duration, (Endpoint, Endpoint)),
367            ),
368        ]
369        .into_iter()
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use futures::task::Spawn;
377    use futures::{AsyncReadExt, AsyncWriteExt};
378    use quickcheck::{Gen, QuickCheck, TestResult};
379    use std::time::Instant;
380
381    #[test]
382    fn quickcheck() {
383        fn prop(msg: Vec<u8>, bandwidth: u32, rtt: u64) -> TestResult {
384            let start = Instant::now();
385
386            let bandwidth = bandwidth % 1024 * 1024 * 1024; // No more than 1 GiB.
387            let rtt = Duration::from_micros(rtt % Duration::from_secs(1).as_millis() as u64); // No more than 1 second.
388
389            if bandwidth == 0
390                || rtt == Duration::from_micros(1)
391                || msg.is_empty()
392                || single_direction_capacity_bytes(bandwidth as u64, rtt) < 1
393            {
394                return TestResult::discard();
395            }
396
397            let (mut a, mut b) = new_constrained_connection(bandwidth as u64, rtt);
398
399            let mut pool = futures::executor::LocalPool::new();
400
401            let msg_clone = msg.clone();
402            pool.spawner()
403                .spawn_obj(
404                    async move {
405                        a.write_all(&msg_clone).await.unwrap();
406                    }
407                    .boxed()
408                    .into(),
409                )
410                .unwrap();
411
412            pool.run_until(async {
413                let mut received_msg = Vec::new();
414                b.read_to_end(&mut received_msg).await.unwrap();
415
416                assert_eq!(msg, received_msg);
417            });
418
419            let duration = start.elapsed();
420
421            println!(
422                "bandwidth {} KiB/s, rtt {}s duration {}s, msg len {} KiB, percentage {}",
423                bandwidth / 1024,
424                rtt.as_secs_f64(),
425                duration.as_secs_f64(),
426                msg.len() / 1024 * 8,
427                (bandwidth as f64 * (duration.as_secs_f64() - rtt.as_secs_f64() / 2.0))
428                    / (msg.len() * 8) as f64
429            );
430
431            TestResult::passed()
432        }
433
434        QuickCheck::new()
435            .gen(Gen::new(1_000_000))
436            .quickcheck(prop as fn(_, _, _) -> _)
437    }
438}