veilid_tools/tests/native/
test_assembly_buffer.rs

1use rand::seq::SliceRandom;
2
3use crate::*;
4
5fn random_sockaddr() -> SocketAddr {
6    if get_random_u32() & 1 == 0 {
7        let mut addr = [0u8; 16];
8        random_bytes(&mut addr);
9        let port = get_random_u32() as u16;
10        SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(addr), port, 0, 0))
11    } else {
12        let mut addr = [0u8; 4];
13        random_bytes(&mut addr);
14        let port = get_random_u32() as u16;
15        SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(addr), port))
16    }
17}
18
19pub async fn test_single_out_in() {
20    info!("-- test_single_out_in");
21    let assbuf_out = AssemblyBuffer::new();
22    let assbuf_in = AssemblyBuffer::new();
23    let (net_tx, net_rx) = flume::unbounded();
24    let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
25        let net_tx = net_tx.clone();
26        async move {
27            net_tx
28                .send_async((framed_chunk, remote_addr))
29                .await
30                .expect("should send");
31            Ok(NetworkResult::value(()))
32        }
33    };
34
35    for _ in 0..1000 {
36        let random_len = (get_random_u32() % 1000) as usize;
37        let mut message = vec![1u8; random_len];
38        random_bytes(&mut message);
39        let remote_addr = random_sockaddr();
40
41        // Send single message below fragmentation limit
42        assert!(matches!(
43            assbuf_out
44                .split_message(message.clone(), remote_addr, sender)
45                .await,
46            Ok(NetworkResult::Value(()))
47        ));
48
49        // Ensure we didn't fragment
50        let (frame, r_remote_addr) = net_rx.recv_async().await.expect("should recv");
51
52        // Send to input
53        let r_message = assbuf_in
54            .insert_frame(&frame, r_remote_addr)
55            .into_io_result()
56            .expect("should get a value")
57            .expect("should get something out");
58
59        // We should have gotten the same message
60        assert_eq!(r_message, message);
61        assert_eq!(r_remote_addr, remote_addr);
62    }
63
64    // Shoud have consumed everything
65    assert!(net_rx.is_empty())
66}
67
68pub async fn test_one_frag_out_in() {
69    info!("-- test_one_frag_out_in");
70    let assbuf_out = AssemblyBuffer::new();
71    let assbuf_in = AssemblyBuffer::new();
72    let (net_tx, net_rx) = flume::unbounded();
73    let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
74        let net_tx = net_tx.clone();
75        async move {
76            net_tx
77                .send_async((framed_chunk, remote_addr))
78                .await
79                .expect("should send");
80            Ok(NetworkResult::value(()))
81        }
82    };
83
84    let mut all_sent = HashSet::new();
85
86    // Sending
87    info!("sending");
88    for _ in 0..10000 {
89        let to_send = loop {
90            let random_len = (get_random_u32() % 1000) as usize + FRAGMENT_LEN;
91            let mut message = vec![1u8; random_len];
92            random_bytes(&mut message);
93            let remote_addr = random_sockaddr();
94
95            let to_send = (message, remote_addr);
96
97            if !all_sent.contains(&to_send) {
98                break to_send;
99            }
100        };
101
102        // Send single message above fragmentation limit
103        all_sent.insert(to_send.clone());
104        assert!(matches!(
105            assbuf_out.split_message(to_send.0, to_send.1, sender).await,
106            Ok(NetworkResult::Value(()))
107        ));
108    }
109
110    info!("all_sent len={}", all_sent.len());
111
112    info!("fragments sent = {}", net_rx.len());
113
114    drop(net_tx);
115
116    // Receiving
117    info!("receiving");
118
119    while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
120        // Send to input
121        let r_message = assbuf_in
122            .insert_frame(&frame, r_remote_addr)
123            .into_io_result()
124            .expect("should get a value");
125
126        // We should have gotten the same message
127        if let Some(r_message) = r_message {
128            assert!(all_sent.remove(&(r_message, r_remote_addr)));
129        }
130    }
131    info!("all_sent len={}", all_sent.len());
132
133    // Shoud have dropped no packets
134    assert_eq!(all_sent.len(), 0);
135}
136
137pub async fn test_many_frags_out_in() {
138    info!("-- test_many_frags_out_in");
139    let assbuf_out = AssemblyBuffer::new();
140    let assbuf_in = AssemblyBuffer::new();
141    let (net_tx, net_rx) = flume::unbounded();
142    let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
143        let net_tx = net_tx.clone();
144        async move {
145            net_tx
146                .send_async((framed_chunk, remote_addr))
147                .await
148                .expect("should send");
149            Ok(NetworkResult::value(()))
150        }
151    };
152
153    let mut all_sent = HashSet::new();
154
155    // Sending
156    let mut total_sent_size = 0usize;
157    info!("sending");
158    for _ in 0..1000 {
159        let to_send = loop {
160            let random_len = (get_random_u32() % 65536) as usize;
161            let mut message = vec![1u8; random_len];
162            random_bytes(&mut message);
163            let remote_addr = random_sockaddr();
164            let to_send = (message, remote_addr);
165
166            if !all_sent.contains(&to_send) {
167                break to_send;
168            }
169        };
170
171        // Send single message
172        all_sent.insert(to_send.clone());
173        total_sent_size += to_send.0.len();
174
175        assert!(matches!(
176            assbuf_out.split_message(to_send.0, to_send.1, sender).await,
177            Ok(NetworkResult::Value(()))
178        ));
179    }
180
181    info!("all_sent len={}", all_sent.len());
182    info!("total_sent_size = {}", total_sent_size);
183    info!("fragments sent = {}", net_rx.len());
184
185    drop(net_tx);
186
187    // Receiving
188    info!("receiving");
189
190    while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
191        // Send to input
192        let r_message = assbuf_in
193            .insert_frame(&frame, r_remote_addr)
194            .into_io_result()
195            .expect("should get a value");
196
197        // We should have gotten the same message
198        if let Some(r_message) = r_message {
199            assert!(all_sent.remove(&(r_message, r_remote_addr)));
200        }
201    }
202    info!("all_sent len={}", all_sent.len());
203
204    // Shoud have dropped no packets
205    assert_eq!(all_sent.len(), 0);
206}
207
208pub async fn test_many_frags_out_in_single_host() {
209    info!("-- test_many_frags_out_in_single_host");
210    let assbuf_out = AssemblyBuffer::new();
211    let assbuf_in = AssemblyBuffer::new();
212    let (net_tx, net_rx) = flume::unbounded();
213    let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
214        let net_tx = net_tx.clone();
215        async move {
216            net_tx
217                .send_async((framed_chunk, remote_addr))
218                .await
219                .expect("should send");
220            Ok(NetworkResult::value(()))
221        }
222    };
223
224    let mut all_sent = HashSet::new();
225
226    // Sending
227    let mut total_sent_size = 0usize;
228    info!("sending");
229    for _ in 0..1000 {
230        let to_send = loop {
231            let remote_addr = random_sockaddr();
232            let random_len = (get_random_u32() % 65536) as usize;
233            let mut message = vec![1u8; random_len];
234            random_bytes(&mut message);
235
236            let to_send = (message.clone(), remote_addr);
237
238            if !all_sent.contains(&to_send) {
239                break to_send;
240            }
241        };
242
243        // Send single message
244        all_sent.insert(to_send.clone());
245        total_sent_size += to_send.0.len();
246        assert!(matches!(
247            assbuf_out.split_message(to_send.0, to_send.1, sender).await,
248            Ok(NetworkResult::Value(()))
249        ));
250    }
251
252    info!("all_sent len={}", all_sent.len());
253    info!("total_sent_size = {}", total_sent_size);
254    info!("fragments sent = {}", net_rx.len());
255
256    drop(net_tx);
257
258    // Receiving
259    info!("receiving");
260
261    while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
262        // Send to input
263        let r_message = assbuf_in
264            .insert_frame(&frame, r_remote_addr)
265            .into_io_result()
266            .expect("should get a value");
267
268        // We should have gotten the same message
269        if let Some(r_message) = r_message {
270            assert!(all_sent.remove(&(r_message, r_remote_addr)));
271        }
272    }
273    info!("all_sent len={}", all_sent.len());
274
275    // Shoud have dropped no packets
276    assert_eq!(all_sent.len(), 0);
277}
278
279pub async fn test_many_frags_with_drops() {
280    info!("-- test_many_frags_with_drops");
281    let assbuf_out = AssemblyBuffer::new();
282    let assbuf_in = AssemblyBuffer::new();
283    let (net_tx, net_rx) = flume::unbounded();
284
285    let first = Arc::new(AtomicBool::new(true));
286
287    let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
288        let net_tx = net_tx.clone();
289        let first = first.clone();
290        async move {
291            // Send only first packet, drop rest
292            if first.swap(false, Ordering::AcqRel) {
293                net_tx
294                    .send_async((framed_chunk, remote_addr))
295                    .await
296                    .expect("should send");
297            }
298            Ok(NetworkResult::value(()))
299        }
300    };
301
302    let mut all_sent = HashSet::new();
303
304    // Sending
305    let mut total_sent_size = 0usize;
306    let mut total_fragged = 0usize;
307    info!("sending");
308    for _ in 0..1000 {
309        let to_send = loop {
310            let remote_addr = random_sockaddr();
311            let random_len = (get_random_u32() % 65536) as usize;
312            if random_len > FRAGMENT_LEN {
313                total_fragged += 1;
314            }
315            let mut message = vec![1u8; random_len];
316            random_bytes(&mut message);
317
318            let to_send = (message.clone(), remote_addr);
319
320            if !all_sent.contains(&to_send) {
321                break to_send;
322            }
323        };
324
325        // Send single message
326        all_sent.insert(to_send.clone());
327        total_sent_size += to_send.0.len();
328
329        assert!(matches!(
330            assbuf_out.split_message(to_send.0, to_send.1, sender).await,
331            Ok(NetworkResult::Value(()))
332        ));
333
334        first.store(true, Ordering::Release);
335    }
336
337    info!("all_sent len={}", all_sent.len());
338    info!("total_sent_size = {}", total_sent_size);
339    info!("fragments sent = {}", net_rx.len());
340    info!("total_fragged = {}", total_fragged);
341    drop(net_tx);
342
343    // Receiving
344    info!("receiving");
345
346    while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
347        // Send to input
348        let r_message = assbuf_in
349            .insert_frame(&frame, r_remote_addr)
350            .into_io_result()
351            .expect("should get a value");
352
353        // We should have gotten the same message
354        if let Some(r_message) = r_message {
355            assert!(all_sent.remove(&(r_message, r_remote_addr)));
356        }
357    }
358    info!("all_sent len={}", all_sent.len());
359
360    // Shoud have dropped all fragged packets
361    assert_eq!(all_sent.len(), total_fragged);
362}
363
364pub async fn test_many_frags_reordered() {
365    info!("-- test_many_frags_reordered");
366    let assbuf_out = AssemblyBuffer::new();
367    let assbuf_in = AssemblyBuffer::new();
368    let (net_tx, net_rx) = flume::unbounded();
369
370    let reorder_buffer = Arc::new(Mutex::new(Vec::new()));
371    let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
372        let reorder_buffer = reorder_buffer.clone();
373        async move {
374            reorder_buffer.lock().push((framed_chunk, remote_addr));
375            Ok(NetworkResult::Value(()))
376        }
377    };
378
379    let mut all_sent = HashSet::new();
380
381    // Sending
382    let mut total_sent_size = 0usize;
383    let mut rng = rand::thread_rng();
384    info!("sending");
385    for _ in 0..1000 {
386        let to_send = loop {
387            let random_len = (get_random_u32() % 65536) as usize;
388            let mut message = vec![1u8; random_len];
389            random_bytes(&mut message);
390            let remote_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 5678));
391
392            let to_send = (message.clone(), remote_addr);
393
394            if !all_sent.contains(&to_send) {
395                break to_send;
396            }
397        };
398
399        // Send single message
400        all_sent.insert(to_send.clone());
401        total_sent_size += to_send.0.len();
402        assert!(matches!(
403            assbuf_out.split_message(to_send.0, to_send.1, sender).await,
404            Ok(NetworkResult::Value(()))
405        ));
406
407        // Shuffle fragments
408        let items = {
409            let mut rbinner = reorder_buffer.lock();
410            rbinner.shuffle(&mut rng);
411            let items = rbinner.clone();
412            rbinner.clear();
413            items
414        };
415        for p in items {
416            net_tx.send_async(p).await.expect("should send");
417        }
418    }
419
420    info!("all_sent len={}", all_sent.len());
421    info!("total_sent_size = {}", total_sent_size);
422    info!("fragments sent = {}", net_rx.len());
423
424    drop(net_tx);
425
426    // Receiving
427    info!("receiving");
428
429    while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
430        // Send to input
431        let r_message = assbuf_in
432            .insert_frame(&frame, r_remote_addr)
433            .into_io_result()
434            .expect("should get a value");
435
436        // We should have gotten the same message
437        if let Some(r_message) = r_message {
438            assert!(all_sent.remove(&(r_message, r_remote_addr)));
439        }
440    }
441    info!("all_sent len={}", all_sent.len());
442
443    // Shoud have dropped no packets
444    assert_eq!(all_sent.len(), 0);
445}
446
447pub async fn test_all() {
448    test_single_out_in().await;
449    test_one_frag_out_in().await;
450    test_many_frags_out_in().await;
451    test_many_frags_out_in_single_host().await;
452    test_many_frags_with_drops().await;
453    test_many_frags_reordered().await;
454}