1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2
3use crate::SpeedLimitSession;
4
5#[async_trait::async_trait]
6pub trait DatagramSend: Send + 'static {
7 type Error;
8 async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
9}
10
11#[async_trait::async_trait]
12pub trait DatagramRecv: Send + 'static {
13 type Error;
14 async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
15}
16
17enum ReadState {
18 Idle,
19 Reading((usize, usize)),
20}
21
22enum WriteState {
23 Idle,
24 Writing((usize, usize)),
25}
26
27pub struct LimitDatagramSend<S: DatagramSend> {
28 inner: S,
29 write_limiter: SpeedLimitSession,
30 write_state: WriteState,
31}
32
33impl <S: DatagramSend> LimitDatagramSend<S> {
34 pub fn new(inner: S, write_limiter: SpeedLimitSession) -> Self {
35 Self {
36 inner,
37 write_limiter,
38 write_state: WriteState::Idle
39 }
40 }
41}
42
43#[async_trait::async_trait]
44impl <S: DatagramSend> DatagramSend for LimitDatagramSend<S> {
45 type Error = S::Error;
46
47 async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
48 match &mut self.write_state {
49 WriteState::Idle => {
50 let write_len = self.write_limiter.until_ready().await;
51 self.inner.send_to(buf).await?;
52 if buf.len() > write_len {
53 self.write_state = WriteState::Idle;
54 } else {
55 self.write_state = WriteState::Writing((write_len, buf.len()));
56 }
57 Ok(buf.len())
58 }
59 WriteState::Writing((write_len, written_len)) => {
60 self.inner.send_to(buf).await?;
61 if *written_len + buf.len() >= *write_len {
62 self.write_state = WriteState::Idle;
63 Ok(buf.len())
64 } else {
65 self.write_state = WriteState::Writing((*write_len, *written_len + buf.len()));
66 Ok(buf.len())
67 }
68 },
69 }
70 }
71}
72
73pub struct LimitDatagramRecv<R: DatagramRecv> {
74 inner: R,
75 read_limiter: SpeedLimitSession,
76 read_state: ReadState,
77}
78
79impl<R: DatagramRecv> LimitDatagramRecv<R> {
80 pub fn new(inner: R, read_limiter: SpeedLimitSession) -> Self {
81 Self {
82 inner,
83 read_limiter,
84 read_state: ReadState::Idle,
85 }
86 }
87}
88
89#[async_trait::async_trait]
90impl<R: DatagramRecv> DatagramRecv for LimitDatagramRecv<R> {
91 type Error = R::Error;
92 async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
93 match &mut self.read_state {
94 ReadState::Idle => {
95 let read_len = self.read_limiter.until_ready().await;
96 let len = self.inner.recv_from(buf).await?;
97 if len > read_len {
98 self.read_state = ReadState::Idle;
99 Ok(len)
100 } else {
101 self.read_state = ReadState::Reading((read_len, len));
102 Ok(len)
103 }
104 },
105 ReadState::Reading((read_len, readded_len)) => {
106 let len = self.inner.recv_from(buf).await?;
107 if *readded_len + len >= *read_len {
108 self.read_state = ReadState::Idle;
109 } else {
110 self.read_state = ReadState::Reading((*read_len, *readded_len + len));
111 }
112 Ok(len)
113 },
114 }
115 }
116}
117
118#[async_trait::async_trait]
119pub trait Datagram: Send + 'static {
120 type Error;
121 async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
122 async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
123}
124
125pub struct LimitDatagram<D: Datagram> {
126 inner: D,
127 write_limiter: SpeedLimitSession,
128 read_limiter: SpeedLimitSession,
129 read_state: ReadState,
130 write_state: WriteState,
131}
132
133impl<D: Datagram> LimitDatagram<D> {
134 pub fn new(inner: D, read_limit: SpeedLimitSession, write_limit: SpeedLimitSession) -> Self {
135 Self { inner,
136 write_limiter: write_limit,
137 read_limiter: read_limit,
138 read_state: ReadState::Idle,
139 write_state: WriteState::Idle,
140 }
141 }
142}
143
144#[async_trait::async_trait]
145impl<D: Datagram> Datagram for LimitDatagram<D> {
146 type Error = D::Error;
147
148 async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
149 match &mut self.write_state {
150 WriteState::Idle => {
151 let write_len = self.write_limiter.until_ready().await;
152 self.inner.send_to(buf).await?;
153 if buf.len() > write_len {
154 self.write_state = WriteState::Idle;
155 } else {
156 self.write_state = WriteState::Writing((write_len, buf.len()));
157 }
158 Ok(buf.len())
159 }
160 WriteState::Writing((write_len, written_len)) => {
161 self.inner.send_to(buf).await?;
162 if *written_len + buf.len() >= *write_len {
163 self.write_state = WriteState::Idle;
164 Ok(buf.len())
165 } else {
166 self.write_state = WriteState::Writing((*write_len, *written_len + buf.len()));
167 Ok(buf.len())
168 }
169 },
170 }
171 }
172
173 async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
174 match &mut self.read_state {
175 ReadState::Idle => {
176 let read_len = self.read_limiter.until_ready().await;
177 let len = self.inner.recv_from(buf).await?;
178 if len > read_len {
179 self.read_state = ReadState::Idle;
180 Ok(len)
181 } else {
182 self.read_state = ReadState::Reading((read_len, len));
183 Ok(len)
184 }
185 },
186 ReadState::Reading((read_len, readded_len)) => {
187 let len = self.inner.recv_from(buf).await?;
188 if *readded_len + len >= *read_len {
189 self.read_state = ReadState::Idle;
190 } else {
191 self.read_state = ReadState::Reading((*read_len, *readded_len + len));
192 }
193 Ok(len)
194 },
195 }
196 }
197}
198
199#[cfg_attr(coverage_nightly, coverage(off))]
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use async_trait::async_trait;
204 use std::sync::{Arc, Mutex};
205 use std::collections::VecDeque;
206 use std::num::NonZeroU32;
207 use std::time::{Duration, Instant};
208
209 #[derive(Clone)]
211 struct MockDatagram {
212 call_history: Arc<Mutex<Vec<String>>>,
214 send_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
216 recv_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
217 }
218
219 impl MockDatagram {
220 fn new() -> Self {
221 Self {
222 call_history: Arc::new(Mutex::new(Vec::new())),
223 send_returns: Arc::new(Mutex::new(VecDeque::new())),
224 recv_returns: Arc::new(Mutex::new(VecDeque::new())),
225 }
226 }
227
228 fn with_send_result(self, result: Result<usize, &'static str>) -> Self {
230 self.send_returns.lock().unwrap().push_back(result);
231 self
232 }
233
234 fn with_recv_result(self, result: Result<usize, &'static str>) -> Self {
236 self.recv_returns.lock().unwrap().push_back(result);
237 self
238 }
239
240 fn get_call_history(&self) -> Vec<String> {
242 self.call_history.lock().unwrap().clone()
243 }
244 }
245
246 #[async_trait]
247 impl Datagram for MockDatagram {
248 type Error = &'static str;
249
250 async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
251 self.call_history.lock().unwrap().push("send_to".to_string());
252 match self.send_returns.lock().unwrap().pop_front() {
253 Some(_result) => Ok(buf.len()),
254 None => Ok(buf.len()), }
256 }
257
258 async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
259 self.call_history.lock().unwrap().push("recv_from".to_string());
260 match self.recv_returns.lock().unwrap().pop_front() {
261 Some(_result) => Ok(buf.len()),
262 None => Ok(buf.len()), }
264 }
265 }
266
267 struct MockLimitRef {
269 read_limit: Option<usize>,
270 write_limit: Option<usize>,
271 }
272
273 #[tokio::test]
275 async fn test_limit_datagram_new() {
276 let mock_datagram = MockDatagram::new();
277 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
278 let read_limit = read_limiter.new_limit_session();
279 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
280 let write_limit = write_limiter.new_limit_session();
281 let limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
282
283 match limit_datagram.read_state {
285 ReadState::Idle => (),
286 _ => panic!("Expected ReadState::Idle"),
287 }
288 match limit_datagram.write_state {
289 WriteState::Idle => (),
290 _ => panic!("Expected WriteState::Idle"),
291 }
292 }
293
294 #[tokio::test]
296 async fn test_send_to_without_write_limit() {
297 let mock_datagram = MockDatagram::new().with_send_result(Ok(10));
298 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
299 let read_limit = read_limiter.new_limit_session();
300 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
301 let write_limit = write_limiter.new_limit_session();
302 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
303
304 let buffer = [0u8; 10];
305 let result = limit_datagram.send_to(&buffer).await;
306
307 assert_eq!(result, Ok(10));
309 }
310
311 #[tokio::test]
313 async fn test_recv_from_without_read_limit() {
314 let mock_datagram = MockDatagram::new().with_recv_result(Ok(10));
315 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::MAX), Some(NonZeroU32::new(1024).unwrap()));
316 let read_limit = read_limiter.new_limit_session();
317 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
318 let write_limit = write_limiter.new_limit_session();
319 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
320
321 let mut buffer = [0u8; 10];
322 let result = limit_datagram.recv_from(&mut buffer).await;
323
324 assert_eq!(result, Ok(10));
326 }
327
328 #[tokio::test]
330 async fn test_send_to_with_write_limit_initial() {
331 let mock_datagram = MockDatagram::new().with_send_result(Ok(10));
332 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::MAX), Some(NonZeroU32::new(1024).unwrap()));
333 let read_limit = read_limiter.new_limit_session();
334 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
335 let write_limit = write_limiter.new_limit_session();
336 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
337
338 let buffer = [0u8; 10];
339 let result = limit_datagram.send_to(&buffer).await;
340
341 assert!(result.is_ok());
343 }
344
345 #[tokio::test]
347 async fn test_recv_from_with_read_limit_initial() {
348 let mock_datagram = MockDatagram::new().with_recv_result(Ok(10));
349 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
350 let read_limit = read_limiter.new_limit_session();
351 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
352 let write_limit = write_limiter.new_limit_session();
353 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
354
355 let mut buffer = [0u8; 10];
356 let result = limit_datagram.recv_from(&mut buffer).await;
357
358 assert!(result.is_ok());
360 }
361
362 #[tokio::test]
364 async fn test_send_to_in_writing_state() {
365 let mock_datagram = MockDatagram::new()
366 .with_send_result(Ok(5))
367 .with_send_result(Ok(5));
368
369 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
370 let read_limit = read_limiter.new_limit_session();
371 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(10).unwrap()));
372 let write_limit = write_limiter.new_limit_session();
373 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
374
375 let buffer1 = [0u8; 5];
377 let _ = limit_datagram.send_to(&buffer1).await;
378
379 match limit_datagram.write_state {
381 WriteState::Writing(_) => (),
382 _ => panic!("Expected WriteState::Writing"),
383 }
384
385 let start = Instant::now();
386 let buffer2 = [0u8; 2];
388 let result = limit_datagram.send_to(&buffer2).await;
389 assert!(start.elapsed() <= Duration::from_millis(50));
390 assert_eq!(result, Ok(2));
392
393 let buffer2 = [0u8; 5];
395 let result = limit_datagram.send_to(&buffer2).await;
396
397 assert_eq!(result, Ok(5));
399 assert!(start.elapsed() <= Duration::from_millis(100));
400 let buffer2 = [0u8; 5];
402 let result = limit_datagram.send_to(&buffer2).await;
403
404 assert_eq!(result, Ok(5));
406 assert!(start.elapsed() >= Duration::from_millis(900));
407 }
408
409 #[tokio::test]
411 async fn test_recv_from_in_reading_state() {
412 let mock_datagram = MockDatagram::new()
413 .with_recv_result(Ok(5))
414 .with_recv_result(Ok(5));
415 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(10).unwrap()));
416 let read_limit = read_limiter.new_limit_session();
417 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(10).unwrap()));
418 let write_limit = write_limiter.new_limit_session();
419 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
420
421 let mut buffer1 = [0u8; 5];
423 let _ = limit_datagram.recv_from(&mut buffer1).await;
424
425 match limit_datagram.read_state {
427 ReadState::Reading(_) => (),
428 _ => panic!("Expected ReadState::Reading"),
429 }
430
431 let start = Instant::now();
432 let mut buffer2 = [0u8; 2];
434 let result = limit_datagram.recv_from(&mut buffer2).await;
435 assert!(start.elapsed() <= Duration::from_millis(50));
436
437 assert_eq!(result, Ok(2));
439
440 let mut buffer2 = [0u8; 5];
441 let result = limit_datagram.recv_from(&mut buffer2).await;
442 assert!(start.elapsed() <= Duration::from_millis(100));
443
444 assert_eq!(result, Ok(5));
446
447 let mut buffer2 = [0u8; 5];
448
449 let result = limit_datagram.recv_from(&mut buffer2).await;
450 assert!(start.elapsed() > Duration::from_millis(900));
451
452 assert_eq!(result, Ok(5));
454 }
455
456 #[tokio::test]
458 async fn test_send_to_complete_writing_state() {
459 let mock_datagram = MockDatagram::new()
460 .with_send_result(Ok(5))
461 .with_send_result(Ok(5));
462 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
463 let read_limit = read_limiter.new_limit_session();
464 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
465 let write_limit = write_limiter.new_limit_session();
466 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
467
468 let buffer1 = [0u8; 5];
470 let _ = limit_datagram.send_to(&buffer1).await;
471
472 let buffer2 = [0u8; 5];
474 let _ = limit_datagram.send_to(&buffer2).await;
475
476 match limit_datagram.write_state {
478 WriteState::Idle => (),
479 _ => panic!("Expected WriteState::Idle"),
480 }
481 }
482
483 #[tokio::test]
485 async fn test_recv_from_complete_reading_state() {
486 let mock_datagram = MockDatagram::new()
487 .with_recv_result(Ok(5))
488 .with_recv_result(Ok(5));
489 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
490 let read_limit = read_limiter.new_limit_session();
491 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
492 let write_limit = write_limiter.new_limit_session();
493 let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
494
495 let mut buffer1 = [0u8; 5];
497 let _ = limit_datagram.recv_from(&mut buffer1).await;
498
499 let mut buffer2 = [0u8; 5];
501 let _ = limit_datagram.recv_from(&mut buffer2).await;
502
503 match limit_datagram.read_state {
505 ReadState::Idle => (),
506 _ => panic!("Expected ReadState::Idle"),
507 }
508 }
509
510
511 #[derive(Clone)]
513 struct MockDatagramSend {
514 call_history: Arc<Mutex<Vec<String>>>,
515 send_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
516 }
517
518 impl MockDatagramSend {
519 fn new() -> Self {
520 Self {
521 call_history: Arc::new(Mutex::new(Vec::new())),
522 send_returns: Arc::new(Mutex::new(VecDeque::new())),
523 }
524 }
525
526 fn with_send_result(self, result: Result<usize, &'static str>) -> Self {
527 self.send_returns.lock().unwrap().push_back(result);
528 self
529 }
530
531 fn get_call_history(&self) -> Vec<String> {
532 self.call_history.lock().unwrap().clone()
533 }
534 }
535
536 #[async_trait]
537 impl DatagramSend for MockDatagramSend {
538 type Error = &'static str;
539
540 async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
541 self.call_history.lock().unwrap().push("send_to".to_string());
542 let result = self.send_returns.lock().unwrap().pop_front().unwrap_or(Ok(buf.len()));
543 tokio::time::sleep(Duration::from_millis(10)).await;
545 result
546 }
547 }
548
549 #[derive(Clone)]
551 struct MockDatagramRecv {
552 call_history: Arc<Mutex<Vec<String>>>,
553 recv_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
554 }
555
556 impl MockDatagramRecv {
557 fn new() -> Self {
558 Self {
559 call_history: Arc::new(Mutex::new(Vec::new())),
560 recv_returns: Arc::new(Mutex::new(VecDeque::new())),
561 }
562 }
563
564 fn with_recv_result(self, result: Result<usize, &'static str>) -> Self {
565 self.recv_returns.lock().unwrap().push_back(result);
566 self
567 }
568
569 fn get_call_history(&self) -> Vec<String> {
570 self.call_history.lock().unwrap().clone()
571 }
572 }
573
574 #[async_trait]
575 impl DatagramRecv for MockDatagramRecv {
576 type Error = &'static str;
577
578 async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
579 self.call_history.lock().unwrap().push("recv_from".to_string());
580 let result = self.recv_returns.lock().unwrap().pop_front().unwrap_or(Ok(buf.len()));
581 tokio::time::sleep(Duration::from_millis(10)).await;
583 result
584 }
585 }
586
587 #[tokio::test]
589 async fn test_limit_datagram_send_new() {
590 let mock_sender = MockDatagramSend::new();
591 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
592 let write_limit = write_limiter.new_limit_session();
593 let limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
594
595 match limit_sender.write_state {
596 WriteState::Idle => (),
597 _ => panic!("Expected WriteState::Idle"),
598 }
599 }
600
601 #[tokio::test]
603 async fn test_limit_datagram_send_without_limit() {
604 let mock_sender = MockDatagramSend::new().with_send_result(Ok(10));
605 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(100).unwrap()), Some(NonZeroU32::new(100).unwrap()));
606 let write_limit = write_limiter.new_limit_session();
607 let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
608
609 let buffer = [0u8; 10];
610 let result = limit_sender.send_to(&buffer).await;
611
612 assert_eq!(result, Ok(10));
613 }
614
615 #[tokio::test]
617 async fn test_limit_datagram_send_with_limit() {
618 let mock_sender = MockDatagramSend::new().with_send_result(Ok(5));
619 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
620 let write_limit = write_limiter.new_limit_session();
621 let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
622
623 let buffer = [0u8; 5];
624 let result = limit_sender.send_to(&buffer).await;
625
626 assert!(result.is_ok());
627 }
628
629 #[tokio::test]
631 async fn test_limit_datagram_send_in_writing_state() {
632 let mock_sender = MockDatagramSend::new()
633 .with_send_result(Ok(5))
634 .with_send_result(Ok(3));
635
636 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(60).unwrap()));
637 let write_limit = write_limiter.new_limit_session();
638 let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
639
640 let buffer1 = [0u8; 5];
642 let _ = limit_sender.send_to(&buffer1).await;
643
644 match &limit_sender.write_state {
646 WriteState::Writing((limit, written)) => {
647 assert_eq!(*limit, 60);
648 assert_eq!(*written, 5);
649 },
650 _ => panic!("Expected WriteState::Writing"),
651 }
652
653 let buffer2 = [0u8; 3];
655 let result = limit_sender.send_to(&buffer2).await;
656
657 assert_eq!(result, Ok(3));
659 }
660
661 #[tokio::test]
663 async fn test_limit_datagram_send_complete_writing_state() {
664 let mock_sender = MockDatagramSend::new()
665 .with_send_result(Ok(3))
666 .with_send_result(Ok(3));
667
668 let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(5).unwrap()), Some(NonZeroU32::new(5).unwrap()));
669 let write_limit = write_limiter.new_limit_session();
670 let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
671
672 let buffer1 = [0u8; 3];
674 let _ = limit_sender.send_to(&buffer1).await;
675
676 match &limit_sender.write_state {
678 WriteState::Writing((limit, written)) => {
679 assert_eq!(*limit, 5);
680 assert_eq!(*written, 3);
681 },
682 _ => panic!("Expected WriteState::Writing"),
683 }
684
685 let buffer2 = [0u8; 3];
687 let _ = limit_sender.send_to(&buffer2).await;
688
689 match limit_sender.write_state {
691 WriteState::Idle => (),
692 _ => panic!("Expected WriteState::Idle"),
693 }
694 }
695
696 #[tokio::test]
698 async fn test_limit_datagram_recv_new() {
699 let mock_receiver = MockDatagramRecv::new();
700 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
701 let read_limit = read_limiter.new_limit_session();
702 let limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
703
704 match limit_receiver.read_state {
705 ReadState::Idle => (),
706 _ => panic!("Expected ReadState::Idle"),
707 }
708 }
709
710 #[tokio::test]
712 async fn test_limit_datagram_recv_without_limit() {
713 let mock_receiver = MockDatagramRecv::new().with_recv_result(Ok(10));
714 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(100).unwrap()), Some(NonZeroU32::new(100).unwrap()));
715 let read_limit = read_limiter.new_limit_session();
716 let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
717
718 let mut buffer = [0u8; 10];
719 let result = limit_receiver.recv_from(&mut buffer).await;
720
721 assert_eq!(result, Ok(10));
722 }
723
724 #[tokio::test]
726 async fn test_limit_datagram_recv_with_limit() {
727 let mock_receiver = MockDatagramRecv::new().with_recv_result(Ok(5));
728 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
729 let read_limit = read_limiter.new_limit_session();
730 let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
731
732 let mut buffer = [0u8; 5];
733 let result = limit_receiver.recv_from(&mut buffer).await;
734
735 assert!(result.is_ok());
736 }
737
738 #[tokio::test]
740 async fn test_limit_datagram_recv_in_reading_state() {
741 let mock_receiver = MockDatagramRecv::new()
742 .with_recv_result(Ok(5))
743 .with_recv_result(Ok(3));
744
745 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(60).unwrap()));
746 let read_limit = read_limiter.new_limit_session();
747 let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
748
749 let mut buffer1 = [0u8; 5];
751 let _ = limit_receiver.recv_from(&mut buffer1).await;
752
753 match &limit_receiver.read_state {
755 ReadState::Reading((limit, read)) => {
756 assert_eq!(*limit, 60);
757 assert_eq!(*read, 5);
758 },
759 _ => panic!("Expected ReadState::Reading"),
760 }
761
762 let mut buffer2 = [0u8; 3];
764 let result = limit_receiver.recv_from(&mut buffer2).await;
765
766 assert_eq!(result, Ok(3));
768 }
769
770 #[tokio::test]
772 async fn test_limit_datagram_recv_complete_reading_state() {
773 let mock_receiver = MockDatagramRecv::new()
774 .with_recv_result(Ok(3))
775 .with_recv_result(Ok(3));
776
777 let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(5).unwrap()), Some(NonZeroU32::new(5).unwrap()));
778 let read_limit = read_limiter.new_limit_session();
779 let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
780
781 let mut buffer1 = [0u8; 3];
783 let _ = limit_receiver.recv_from(&mut buffer1).await;
784
785 match &limit_receiver.read_state {
787 ReadState::Reading((limit, read)) => {
788 assert_eq!(*limit, 5);
789 assert_eq!(*read, 3);
790 },
791 _ => panic!("Expected ReadState::Reading"),
792 }
793
794 let mut buffer2 = [0u8; 3];
796 let _ = limit_receiver.recv_from(&mut buffer2).await;
797
798 match limit_receiver.read_state {
800 ReadState::Idle => (),
801 _ => panic!("Expected ReadState::Idle"),
802 }
803 }
804
805}