veilid_tools/tests/native/
test_assembly_buffer.rs1use rand::seq::SliceRandom;
2
3use crate::*;
4
5fn random_sockaddr() -> SocketAddr {
6 if get_random_u32() & 1 == 0 {
7 let mut addr = [0u8; 16];
8 random_bytes(&mut addr);
9 let port = get_random_u32() as u16;
10 SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(addr), port, 0, 0))
11 } else {
12 let mut addr = [0u8; 4];
13 random_bytes(&mut addr);
14 let port = get_random_u32() as u16;
15 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(addr), port))
16 }
17}
18
19pub async fn test_single_out_in() {
20 info!("-- test_single_out_in");
21 let assbuf_out = AssemblyBuffer::new();
22 let assbuf_in = AssemblyBuffer::new();
23 let (net_tx, net_rx) = flume::unbounded();
24 let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
25 let net_tx = net_tx.clone();
26 async move {
27 net_tx
28 .send_async((framed_chunk, remote_addr))
29 .await
30 .expect("should send");
31 Ok(NetworkResult::value(()))
32 }
33 };
34
35 for _ in 0..1000 {
36 let random_len = (get_random_u32() % 1000) as usize;
37 let mut message = vec![1u8; random_len];
38 random_bytes(&mut message);
39 let remote_addr = random_sockaddr();
40
41 assert!(matches!(
43 assbuf_out
44 .split_message(message.clone(), remote_addr, sender)
45 .await,
46 Ok(NetworkResult::Value(()))
47 ));
48
49 let (frame, r_remote_addr) = net_rx.recv_async().await.expect("should recv");
51
52 let r_message = assbuf_in
54 .insert_frame(&frame, r_remote_addr)
55 .into_io_result()
56 .expect("should get a value")
57 .expect("should get something out");
58
59 assert_eq!(r_message, message);
61 assert_eq!(r_remote_addr, remote_addr);
62 }
63
64 assert!(net_rx.is_empty())
66}
67
68pub async fn test_one_frag_out_in() {
69 info!("-- test_one_frag_out_in");
70 let assbuf_out = AssemblyBuffer::new();
71 let assbuf_in = AssemblyBuffer::new();
72 let (net_tx, net_rx) = flume::unbounded();
73 let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
74 let net_tx = net_tx.clone();
75 async move {
76 net_tx
77 .send_async((framed_chunk, remote_addr))
78 .await
79 .expect("should send");
80 Ok(NetworkResult::value(()))
81 }
82 };
83
84 let mut all_sent = HashSet::new();
85
86 info!("sending");
88 for _ in 0..10000 {
89 let to_send = loop {
90 let random_len = (get_random_u32() % 1000) as usize + FRAGMENT_LEN;
91 let mut message = vec![1u8; random_len];
92 random_bytes(&mut message);
93 let remote_addr = random_sockaddr();
94
95 let to_send = (message, remote_addr);
96
97 if !all_sent.contains(&to_send) {
98 break to_send;
99 }
100 };
101
102 all_sent.insert(to_send.clone());
104 assert!(matches!(
105 assbuf_out.split_message(to_send.0, to_send.1, sender).await,
106 Ok(NetworkResult::Value(()))
107 ));
108 }
109
110 info!("all_sent len={}", all_sent.len());
111
112 info!("fragments sent = {}", net_rx.len());
113
114 drop(net_tx);
115
116 info!("receiving");
118
119 while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
120 let r_message = assbuf_in
122 .insert_frame(&frame, r_remote_addr)
123 .into_io_result()
124 .expect("should get a value");
125
126 if let Some(r_message) = r_message {
128 assert!(all_sent.remove(&(r_message, r_remote_addr)));
129 }
130 }
131 info!("all_sent len={}", all_sent.len());
132
133 assert_eq!(all_sent.len(), 0);
135}
136
137pub async fn test_many_frags_out_in() {
138 info!("-- test_many_frags_out_in");
139 let assbuf_out = AssemblyBuffer::new();
140 let assbuf_in = AssemblyBuffer::new();
141 let (net_tx, net_rx) = flume::unbounded();
142 let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
143 let net_tx = net_tx.clone();
144 async move {
145 net_tx
146 .send_async((framed_chunk, remote_addr))
147 .await
148 .expect("should send");
149 Ok(NetworkResult::value(()))
150 }
151 };
152
153 let mut all_sent = HashSet::new();
154
155 let mut total_sent_size = 0usize;
157 info!("sending");
158 for _ in 0..1000 {
159 let to_send = loop {
160 let random_len = (get_random_u32() % 65536) as usize;
161 let mut message = vec![1u8; random_len];
162 random_bytes(&mut message);
163 let remote_addr = random_sockaddr();
164 let to_send = (message, remote_addr);
165
166 if !all_sent.contains(&to_send) {
167 break to_send;
168 }
169 };
170
171 all_sent.insert(to_send.clone());
173 total_sent_size += to_send.0.len();
174
175 assert!(matches!(
176 assbuf_out.split_message(to_send.0, to_send.1, sender).await,
177 Ok(NetworkResult::Value(()))
178 ));
179 }
180
181 info!("all_sent len={}", all_sent.len());
182 info!("total_sent_size = {}", total_sent_size);
183 info!("fragments sent = {}", net_rx.len());
184
185 drop(net_tx);
186
187 info!("receiving");
189
190 while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
191 let r_message = assbuf_in
193 .insert_frame(&frame, r_remote_addr)
194 .into_io_result()
195 .expect("should get a value");
196
197 if let Some(r_message) = r_message {
199 assert!(all_sent.remove(&(r_message, r_remote_addr)));
200 }
201 }
202 info!("all_sent len={}", all_sent.len());
203
204 assert_eq!(all_sent.len(), 0);
206}
207
208pub async fn test_many_frags_out_in_single_host() {
209 info!("-- test_many_frags_out_in_single_host");
210 let assbuf_out = AssemblyBuffer::new();
211 let assbuf_in = AssemblyBuffer::new();
212 let (net_tx, net_rx) = flume::unbounded();
213 let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
214 let net_tx = net_tx.clone();
215 async move {
216 net_tx
217 .send_async((framed_chunk, remote_addr))
218 .await
219 .expect("should send");
220 Ok(NetworkResult::value(()))
221 }
222 };
223
224 let mut all_sent = HashSet::new();
225
226 let mut total_sent_size = 0usize;
228 info!("sending");
229 for _ in 0..1000 {
230 let to_send = loop {
231 let remote_addr = random_sockaddr();
232 let random_len = (get_random_u32() % 65536) as usize;
233 let mut message = vec![1u8; random_len];
234 random_bytes(&mut message);
235
236 let to_send = (message.clone(), remote_addr);
237
238 if !all_sent.contains(&to_send) {
239 break to_send;
240 }
241 };
242
243 all_sent.insert(to_send.clone());
245 total_sent_size += to_send.0.len();
246 assert!(matches!(
247 assbuf_out.split_message(to_send.0, to_send.1, sender).await,
248 Ok(NetworkResult::Value(()))
249 ));
250 }
251
252 info!("all_sent len={}", all_sent.len());
253 info!("total_sent_size = {}", total_sent_size);
254 info!("fragments sent = {}", net_rx.len());
255
256 drop(net_tx);
257
258 info!("receiving");
260
261 while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
262 let r_message = assbuf_in
264 .insert_frame(&frame, r_remote_addr)
265 .into_io_result()
266 .expect("should get a value");
267
268 if let Some(r_message) = r_message {
270 assert!(all_sent.remove(&(r_message, r_remote_addr)));
271 }
272 }
273 info!("all_sent len={}", all_sent.len());
274
275 assert_eq!(all_sent.len(), 0);
277}
278
279pub async fn test_many_frags_with_drops() {
280 info!("-- test_many_frags_with_drops");
281 let assbuf_out = AssemblyBuffer::new();
282 let assbuf_in = AssemblyBuffer::new();
283 let (net_tx, net_rx) = flume::unbounded();
284
285 let first = Arc::new(AtomicBool::new(true));
286
287 let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
288 let net_tx = net_tx.clone();
289 let first = first.clone();
290 async move {
291 if first.swap(false, Ordering::AcqRel) {
293 net_tx
294 .send_async((framed_chunk, remote_addr))
295 .await
296 .expect("should send");
297 }
298 Ok(NetworkResult::value(()))
299 }
300 };
301
302 let mut all_sent = HashSet::new();
303
304 let mut total_sent_size = 0usize;
306 let mut total_fragged = 0usize;
307 info!("sending");
308 for _ in 0..1000 {
309 let to_send = loop {
310 let remote_addr = random_sockaddr();
311 let random_len = (get_random_u32() % 65536) as usize;
312 if random_len > FRAGMENT_LEN {
313 total_fragged += 1;
314 }
315 let mut message = vec![1u8; random_len];
316 random_bytes(&mut message);
317
318 let to_send = (message.clone(), remote_addr);
319
320 if !all_sent.contains(&to_send) {
321 break to_send;
322 }
323 };
324
325 all_sent.insert(to_send.clone());
327 total_sent_size += to_send.0.len();
328
329 assert!(matches!(
330 assbuf_out.split_message(to_send.0, to_send.1, sender).await,
331 Ok(NetworkResult::Value(()))
332 ));
333
334 first.store(true, Ordering::Release);
335 }
336
337 info!("all_sent len={}", all_sent.len());
338 info!("total_sent_size = {}", total_sent_size);
339 info!("fragments sent = {}", net_rx.len());
340 info!("total_fragged = {}", total_fragged);
341 drop(net_tx);
342
343 info!("receiving");
345
346 while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
347 let r_message = assbuf_in
349 .insert_frame(&frame, r_remote_addr)
350 .into_io_result()
351 .expect("should get a value");
352
353 if let Some(r_message) = r_message {
355 assert!(all_sent.remove(&(r_message, r_remote_addr)));
356 }
357 }
358 info!("all_sent len={}", all_sent.len());
359
360 assert_eq!(all_sent.len(), total_fragged);
362}
363
364pub async fn test_many_frags_reordered() {
365 info!("-- test_many_frags_reordered");
366 let assbuf_out = AssemblyBuffer::new();
367 let assbuf_in = AssemblyBuffer::new();
368 let (net_tx, net_rx) = flume::unbounded();
369
370 let reorder_buffer = Arc::new(Mutex::new(Vec::new()));
371 let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| {
372 let reorder_buffer = reorder_buffer.clone();
373 async move {
374 reorder_buffer.lock().push((framed_chunk, remote_addr));
375 Ok(NetworkResult::Value(()))
376 }
377 };
378
379 let mut all_sent = HashSet::new();
380
381 let mut total_sent_size = 0usize;
383 let mut rng = rand::thread_rng();
384 info!("sending");
385 for _ in 0..1000 {
386 let to_send = loop {
387 let random_len = (get_random_u32() % 65536) as usize;
388 let mut message = vec![1u8; random_len];
389 random_bytes(&mut message);
390 let remote_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 2, 3, 4), 5678));
391
392 let to_send = (message.clone(), remote_addr);
393
394 if !all_sent.contains(&to_send) {
395 break to_send;
396 }
397 };
398
399 all_sent.insert(to_send.clone());
401 total_sent_size += to_send.0.len();
402 assert!(matches!(
403 assbuf_out.split_message(to_send.0, to_send.1, sender).await,
404 Ok(NetworkResult::Value(()))
405 ));
406
407 let items = {
409 let mut rbinner = reorder_buffer.lock();
410 rbinner.shuffle(&mut rng);
411 let items = rbinner.clone();
412 rbinner.clear();
413 items
414 };
415 for p in items {
416 net_tx.send_async(p).await.expect("should send");
417 }
418 }
419
420 info!("all_sent len={}", all_sent.len());
421 info!("total_sent_size = {}", total_sent_size);
422 info!("fragments sent = {}", net_rx.len());
423
424 drop(net_tx);
425
426 info!("receiving");
428
429 while let Ok((frame, r_remote_addr)) = net_rx.recv_async().await {
430 let r_message = assbuf_in
432 .insert_frame(&frame, r_remote_addr)
433 .into_io_result()
434 .expect("should get a value");
435
436 if let Some(r_message) = r_message {
438 assert!(all_sent.remove(&(r_message, r_remote_addr)));
439 }
440 }
441 info!("all_sent len={}", all_sent.len());
442
443 assert_eq!(all_sent.len(), 0);
445}
446
447pub async fn test_all() {
448 test_single_out_in().await;
449 test_one_frag_out_in().await;
450 test_many_frags_out_in().await;
451 test_many_frags_out_in_single_host().await;
452 test_many_frags_with_drops().await;
453 test_many_frags_reordered().await;
454}