sfo_io/
limit_datagram.rs

1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2
3use crate::SpeedLimitSession;
4
5#[async_trait::async_trait]
6pub trait DatagramSend: Send + 'static {
7    type Error;
8    async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
9}
10
11#[async_trait::async_trait]
12pub trait DatagramRecv: Send + 'static {
13    type Error;
14    async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
15}
16
17enum ReadState {
18    Idle,
19    Reading((usize, usize)),
20}
21
22enum WriteState {
23    Idle,
24    Writing((usize, usize)),
25}
26
27pub struct LimitDatagramSend<S: DatagramSend> {
28    inner: S,
29    write_limiter: SpeedLimitSession,
30    write_state: WriteState,
31}
32
33impl <S: DatagramSend> LimitDatagramSend<S> {
34    pub fn new(inner: S, write_limiter: SpeedLimitSession) -> Self {
35        Self {
36            inner,
37            write_limiter,
38            write_state: WriteState::Idle
39        }
40    }
41}
42
43#[async_trait::async_trait]
44impl <S: DatagramSend> DatagramSend for LimitDatagramSend<S> {
45    type Error = S::Error;
46
47    async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
48        match &mut self.write_state {
49            WriteState::Idle => {
50                let write_len = self.write_limiter.until_ready().await;
51                self.inner.send_to(buf).await?;
52                if buf.len() > write_len {
53                    self.write_state = WriteState::Idle;
54                } else {
55                    self.write_state = WriteState::Writing((write_len, buf.len()));
56                }
57                Ok(buf.len())
58            }
59            WriteState::Writing((write_len, written_len)) => {
60                self.inner.send_to(buf).await?;
61                if *written_len + buf.len() >= *write_len {
62                    self.write_state = WriteState::Idle;
63                    Ok(buf.len())
64                } else {
65                    self.write_state = WriteState::Writing((*write_len, *written_len + buf.len()));
66                    Ok(buf.len())
67                }
68            },
69        }
70    }
71}
72
73pub struct LimitDatagramRecv<R: DatagramRecv> {
74    inner: R,
75    read_limiter: SpeedLimitSession,
76    read_state: ReadState,
77}
78
79impl<R: DatagramRecv> LimitDatagramRecv<R> {
80    pub fn new(inner: R, read_limiter: SpeedLimitSession) -> Self {
81        Self {
82            inner,
83            read_limiter,
84            read_state: ReadState::Idle,
85        }
86    }
87}
88
89#[async_trait::async_trait]
90impl<R: DatagramRecv> DatagramRecv for LimitDatagramRecv<R> {
91    type Error = R::Error;
92    async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
93        match &mut self.read_state {
94            ReadState::Idle => {
95                let read_len = self.read_limiter.until_ready().await;
96                let len = self.inner.recv_from(buf).await?;
97                if len > read_len {
98                    self.read_state = ReadState::Idle;
99                    Ok(len)
100                } else {
101                    self.read_state = ReadState::Reading((read_len, len));
102                    Ok(len)
103                }
104            },
105            ReadState::Reading((read_len, readded_len)) => {
106                let len = self.inner.recv_from(buf).await?;
107                if *readded_len + len >= *read_len {
108                    self.read_state = ReadState::Idle;
109                } else {
110                    self.read_state = ReadState::Reading((*read_len, *readded_len + len));
111                }
112                Ok(len)
113            },
114        }
115    }
116}
117
118#[async_trait::async_trait]
119pub trait Datagram: Send + 'static {
120    type Error;
121    async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error>;
122    async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error>;
123}
124
125pub struct LimitDatagram<D: Datagram> {
126    inner: D,
127    write_limiter: SpeedLimitSession,
128    read_limiter: SpeedLimitSession,
129    read_state: ReadState,
130    write_state: WriteState,
131}
132
133impl<D: Datagram> LimitDatagram<D> {
134    pub fn new(inner: D, read_limit: SpeedLimitSession, write_limit: SpeedLimitSession) -> Self {
135        Self { inner,
136            write_limiter: write_limit,
137            read_limiter: read_limit,
138            read_state: ReadState::Idle,
139            write_state: WriteState::Idle,
140        }
141    }
142}
143
144#[async_trait::async_trait]
145impl<D: Datagram> Datagram for LimitDatagram<D> {
146    type Error = D::Error;
147
148    async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
149        match &mut self.write_state {
150            WriteState::Idle => {
151                let write_len = self.write_limiter.until_ready().await;
152                self.inner.send_to(buf).await?;
153                if buf.len() > write_len {
154                    self.write_state = WriteState::Idle;
155                } else {
156                    self.write_state = WriteState::Writing((write_len, buf.len()));
157                }
158                Ok(buf.len())
159            }
160            WriteState::Writing((write_len, written_len)) => {
161                self.inner.send_to(buf).await?;
162                if *written_len + buf.len() >= *write_len {
163                    self.write_state = WriteState::Idle;
164                    Ok(buf.len())
165                } else {
166                    self.write_state = WriteState::Writing((*write_len, *written_len + buf.len()));
167                    Ok(buf.len())
168                }
169            },
170        }
171    }
172
173    async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
174        match &mut self.read_state {
175            ReadState::Idle => {
176                let read_len = self.read_limiter.until_ready().await;
177                let len = self.inner.recv_from(buf).await?;
178                if len > read_len {
179                    self.read_state = ReadState::Idle;
180                    Ok(len)
181                } else {
182                    self.read_state = ReadState::Reading((read_len, len));
183                    Ok(len)
184                }
185            },
186            ReadState::Reading((read_len, readded_len)) => {
187                let len = self.inner.recv_from(buf).await?;
188                if *readded_len + len >= *read_len {
189                    self.read_state = ReadState::Idle;
190                } else {
191                    self.read_state = ReadState::Reading((*read_len, *readded_len + len));
192                }
193                Ok(len)
194            },
195        }
196    }
197}
198
199#[cfg_attr(coverage_nightly, coverage(off))]
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use async_trait::async_trait;
204    use std::sync::{Arc, Mutex};
205    use std::collections::VecDeque;
206    use std::num::NonZeroU32;
207    use std::time::{Duration, Instant};
208
209    // Mock Datagram 实现用于测试
210    #[derive(Clone)]
211    struct MockDatagram {
212        // 记录调用历史
213        call_history: Arc<Mutex<Vec<String>>>,
214        // 预设的返回值队列
215        send_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
216        recv_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
217    }
218
219    impl MockDatagram {
220        fn new() -> Self {
221            Self {
222                call_history: Arc::new(Mutex::new(Vec::new())),
223                send_returns: Arc::new(Mutex::new(VecDeque::new())),
224                recv_returns: Arc::new(Mutex::new(VecDeque::new())),
225            }
226        }
227
228        // 设置send_to的返回值
229        fn with_send_result(self, result: Result<usize, &'static str>) -> Self {
230            self.send_returns.lock().unwrap().push_back(result);
231            self
232        }
233
234        // 设置recv_from的返回值
235        fn with_recv_result(self, result: Result<usize, &'static str>) -> Self {
236            self.recv_returns.lock().unwrap().push_back(result);
237            self
238        }
239
240        // 获取调用历史
241        fn get_call_history(&self) -> Vec<String> {
242            self.call_history.lock().unwrap().clone()
243        }
244    }
245
246    #[async_trait]
247    impl Datagram for MockDatagram {
248        type Error = &'static str;
249
250        async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
251            self.call_history.lock().unwrap().push("send_to".to_string());
252            match self.send_returns.lock().unwrap().pop_front() {
253                Some(_result) => Ok(buf.len()),
254                None => Ok(buf.len()), // 默认返回缓冲区长度
255            }
256        }
257
258        async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
259            self.call_history.lock().unwrap().push("recv_from".to_string());
260            match self.recv_returns.lock().unwrap().pop_front() {
261                Some(_result) => Ok(buf.len()),
262                None => Ok(buf.len()), // 默认返回0
263            }
264        }
265    }
266
267    // Mock LimitRef 实现
268    struct MockLimitRef {
269        read_limit: Option<usize>,
270        write_limit: Option<usize>,
271    }
272
273    // 测试new方法的基本功能
274    #[tokio::test]
275    async fn test_limit_datagram_new() {
276        let mock_datagram = MockDatagram::new();
277        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
278        let read_limit = read_limiter.new_limit_session();
279        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
280        let write_limit = write_limiter.new_limit_session();
281        let limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
282
283        // 验证初始状态
284        match limit_datagram.read_state {
285            ReadState::Idle => (),
286            _ => panic!("Expected ReadState::Idle"),
287        }
288        match limit_datagram.write_state {
289            WriteState::Idle => (),
290            _ => panic!("Expected WriteState::Idle"),
291        }
292    }
293
294    // 测试在无写限制时send_to的行为
295    #[tokio::test]
296    async fn test_send_to_without_write_limit() {
297        let mock_datagram = MockDatagram::new().with_send_result(Ok(10));
298        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
299        let read_limit = read_limiter.new_limit_session();
300        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
301        let write_limit = write_limiter.new_limit_session();
302        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
303
304        let buffer = [0u8; 10];
305        let result = limit_datagram.send_to(&buffer).await;
306
307        // 验证结果
308        assert_eq!(result, Ok(10));
309    }
310
311    // 测试在无读限制时recv_from的行为
312    #[tokio::test]
313    async fn test_recv_from_without_read_limit() {
314        let mock_datagram = MockDatagram::new().with_recv_result(Ok(10));
315        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::MAX), Some(NonZeroU32::new(1024).unwrap()));
316        let read_limit = read_limiter.new_limit_session();
317        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
318        let write_limit = write_limiter.new_limit_session();
319        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
320
321        let mut buffer = [0u8; 10];
322        let result = limit_datagram.recv_from(&mut buffer).await;
323
324        // 验证结果
325        assert_eq!(result, Ok(10));
326    }
327
328    // 测试有写限制时首次send_to的行为
329    #[tokio::test]
330    async fn test_send_to_with_write_limit_initial() {
331        let mock_datagram = MockDatagram::new().with_send_result(Ok(10));
332        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::MAX), Some(NonZeroU32::new(1024).unwrap()));
333        let read_limit = read_limiter.new_limit_session();
334        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
335        let write_limit = write_limiter.new_limit_session();
336        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
337
338        let buffer = [0u8; 10];
339        let result = limit_datagram.send_to(&buffer).await;
340
341        // 验证结果
342        assert!(result.is_ok());
343    }
344
345    // 测试有读限制时首次recv_from的行为
346    #[tokio::test]
347    async fn test_recv_from_with_read_limit_initial() {
348        let mock_datagram = MockDatagram::new().with_recv_result(Ok(10));
349        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
350        let read_limit = read_limiter.new_limit_session();
351        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
352        let write_limit = write_limiter.new_limit_session();
353        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
354
355        let mut buffer = [0u8; 10];
356        let result = limit_datagram.recv_from(&mut buffer).await;
357
358        // 验证结果
359        assert!(result.is_ok());
360    }
361
362    // 测试Writing状态下的send_to行为
363    #[tokio::test]
364    async fn test_send_to_in_writing_state() {
365        let mock_datagram = MockDatagram::new()
366            .with_send_result(Ok(5))
367            .with_send_result(Ok(5));
368
369        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
370        let read_limit = read_limiter.new_limit_session();
371        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(10).unwrap()));
372        let write_limit = write_limiter.new_limit_session();
373        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
374
375        // 第一次调用进入Writing状态
376        let buffer1 = [0u8; 5];
377        let _ = limit_datagram.send_to(&buffer1).await;
378
379        // 验证状态
380        match limit_datagram.write_state {
381            WriteState::Writing(_) => (),
382            _ => panic!("Expected WriteState::Writing"),
383        }
384
385        let start = Instant::now();
386        // 第二次调用保持在Writing状态
387        let buffer2 = [0u8; 2];
388        let result = limit_datagram.send_to(&buffer2).await;
389        assert!(start.elapsed() <= Duration::from_millis(50));
390        // 验证结果
391        assert_eq!(result, Ok(2));
392
393        // 第二次调用保持在Writing状态
394        let buffer2 = [0u8; 5];
395        let result = limit_datagram.send_to(&buffer2).await;
396
397        // 验证结果
398        assert_eq!(result, Ok(5));
399        assert!(start.elapsed() <= Duration::from_millis(100));
400        // 第二次调用保持在Writing状态
401        let buffer2 = [0u8; 5];
402        let result = limit_datagram.send_to(&buffer2).await;
403
404        // 验证结果
405        assert_eq!(result, Ok(5));
406        assert!(start.elapsed() >= Duration::from_millis(900));
407    }
408
409    // 测试Reading状态下的recv_from行为
410    #[tokio::test]
411    async fn test_recv_from_in_reading_state() {
412        let mock_datagram = MockDatagram::new()
413            .with_recv_result(Ok(5))
414            .with_recv_result(Ok(5));
415        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(10).unwrap()));
416        let read_limit = read_limiter.new_limit_session();
417        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(10).unwrap()));
418        let write_limit = write_limiter.new_limit_session();
419        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
420
421        // 第一次调用进入Reading状态
422        let mut buffer1 = [0u8; 5];
423        let _ = limit_datagram.recv_from(&mut buffer1).await;
424
425        // 验证状态
426        match limit_datagram.read_state {
427            ReadState::Reading(_) => (),
428            _ => panic!("Expected ReadState::Reading"),
429        }
430
431        let start = Instant::now();
432        // 第二次调用保持在Reading状态
433        let mut buffer2 = [0u8; 2];
434        let result = limit_datagram.recv_from(&mut buffer2).await;
435        assert!(start.elapsed() <= Duration::from_millis(50));
436
437        // 验证结果
438        assert_eq!(result, Ok(2));
439
440        let mut buffer2 = [0u8; 5];
441        let result = limit_datagram.recv_from(&mut buffer2).await;
442        assert!(start.elapsed() <= Duration::from_millis(100));
443
444        // 验证结果
445        assert_eq!(result, Ok(5));
446
447        let mut buffer2 = [0u8; 5];
448
449        let result = limit_datagram.recv_from(&mut buffer2).await;
450        assert!(start.elapsed() > Duration::from_millis(900));
451
452        // 验证结果
453        assert_eq!(result, Ok(5));
454    }
455
456    // 测试Writing状态完成后回到Idle状态
457    #[tokio::test]
458    async fn test_send_to_complete_writing_state() {
459        let mock_datagram = MockDatagram::new()
460            .with_send_result(Ok(5))
461            .with_send_result(Ok(5));
462        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
463        let read_limit = read_limiter.new_limit_session();
464        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
465        let write_limit = write_limiter.new_limit_session();
466        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
467
468        // 第一次调用进入Writing状态
469        let buffer1 = [0u8; 5];
470        let _ = limit_datagram.send_to(&buffer1).await;
471
472        // 第二次调用完成Writing状态,回到Idle
473        let buffer2 = [0u8; 5];
474        let _ = limit_datagram.send_to(&buffer2).await;
475
476        // 验证状态回到Idle
477        match limit_datagram.write_state {
478            WriteState::Idle => (),
479            _ => panic!("Expected WriteState::Idle"),
480        }
481    }
482
483    // 测试Reading状态完成后回到Idle状态
484    #[tokio::test]
485    async fn test_recv_from_complete_reading_state() {
486        let mock_datagram = MockDatagram::new()
487            .with_recv_result(Ok(5))
488            .with_recv_result(Ok(5));
489        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
490        let read_limit = read_limiter.new_limit_session();
491        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(1).unwrap()));
492        let write_limit = write_limiter.new_limit_session();
493        let mut limit_datagram = LimitDatagram::new(mock_datagram, read_limit, write_limit);
494
495        // 第一次调用进入Reading状态
496        let mut buffer1 = [0u8; 5];
497        let _ = limit_datagram.recv_from(&mut buffer1).await;
498
499        // 第二次调用完成Reading状态,回到Idle
500        let mut buffer2 = [0u8; 5];
501        let _ = limit_datagram.recv_from(&mut buffer2).await;
502
503        // 验证状态回到Idle
504        match limit_datagram.read_state {
505            ReadState::Idle => (),
506            _ => panic!("Expected ReadState::Idle"),
507        }
508    }
509
510
511    // Mock DatagramSend 实现用于测试 LimitDatagramSend
512    #[derive(Clone)]
513    struct MockDatagramSend {
514        call_history: Arc<Mutex<Vec<String>>>,
515        send_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
516    }
517
518    impl MockDatagramSend {
519        fn new() -> Self {
520            Self {
521                call_history: Arc::new(Mutex::new(Vec::new())),
522                send_returns: Arc::new(Mutex::new(VecDeque::new())),
523            }
524        }
525
526        fn with_send_result(self, result: Result<usize, &'static str>) -> Self {
527            self.send_returns.lock().unwrap().push_back(result);
528            self
529        }
530
531        fn get_call_history(&self) -> Vec<String> {
532            self.call_history.lock().unwrap().clone()
533        }
534    }
535
536    #[async_trait]
537    impl DatagramSend for MockDatagramSend {
538        type Error = &'static str;
539
540        async fn send_to(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
541            self.call_history.lock().unwrap().push("send_to".to_string());
542            let result = self.send_returns.lock().unwrap().pop_front().unwrap_or(Ok(buf.len()));
543            // 模拟实际发送操作的时间消耗
544            tokio::time::sleep(Duration::from_millis(10)).await;
545            result
546        }
547    }
548
549    // Mock DatagramRecv 实现用于测试 LimitDatagramRecv
550    #[derive(Clone)]
551    struct MockDatagramRecv {
552        call_history: Arc<Mutex<Vec<String>>>,
553        recv_returns: Arc<Mutex<VecDeque<Result<usize, &'static str>>>>,
554    }
555
556    impl MockDatagramRecv {
557        fn new() -> Self {
558            Self {
559                call_history: Arc::new(Mutex::new(Vec::new())),
560                recv_returns: Arc::new(Mutex::new(VecDeque::new())),
561            }
562        }
563
564        fn with_recv_result(self, result: Result<usize, &'static str>) -> Self {
565            self.recv_returns.lock().unwrap().push_back(result);
566            self
567        }
568
569        fn get_call_history(&self) -> Vec<String> {
570            self.call_history.lock().unwrap().clone()
571        }
572    }
573
574    #[async_trait]
575    impl DatagramRecv for MockDatagramRecv {
576        type Error = &'static str;
577
578        async fn recv_from(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
579            self.call_history.lock().unwrap().push("recv_from".to_string());
580            let result = self.recv_returns.lock().unwrap().pop_front().unwrap_or(Ok(buf.len()));
581            // 模拟实际接收操作的时间消耗
582            tokio::time::sleep(Duration::from_millis(10)).await;
583            result
584        }
585    }
586
587    // 测试 LimitDatagramSend 的基本功能
588    #[tokio::test]
589    async fn test_limit_datagram_send_new() {
590        let mock_sender = MockDatagramSend::new();
591        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
592        let write_limit = write_limiter.new_limit_session();
593        let limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
594
595        match limit_sender.write_state {
596            WriteState::Idle => (),
597            _ => panic!("Expected WriteState::Idle"),
598        }
599    }
600
601    // 测试 LimitDatagramSend 在无限制时的行为
602    #[tokio::test]
603    async fn test_limit_datagram_send_without_limit() {
604        let mock_sender = MockDatagramSend::new().with_send_result(Ok(10));
605        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(100).unwrap()), Some(NonZeroU32::new(100).unwrap()));
606        let write_limit = write_limiter.new_limit_session();
607        let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
608
609        let buffer = [0u8; 10];
610        let result = limit_sender.send_to(&buffer).await;
611
612        assert_eq!(result, Ok(10));
613    }
614
615    // 测试 LimitDatagramSend 在有限制时的行为
616    #[tokio::test]
617    async fn test_limit_datagram_send_with_limit() {
618        let mock_sender = MockDatagramSend::new().with_send_result(Ok(5));
619        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
620        let write_limit = write_limiter.new_limit_session();
621        let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
622
623        let buffer = [0u8; 5];
624        let result = limit_sender.send_to(&buffer).await;
625
626        assert!(result.is_ok());
627    }
628
629    // 测试 LimitDatagramSend Writing 状态下的行为
630    #[tokio::test]
631    async fn test_limit_datagram_send_in_writing_state() {
632        let mock_sender = MockDatagramSend::new()
633            .with_send_result(Ok(5))
634            .with_send_result(Ok(3));
635
636        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(60).unwrap()));
637        let write_limit = write_limiter.new_limit_session();
638        let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
639
640        // 第一次调用进入Writing状态
641        let buffer1 = [0u8; 5];
642        let _ = limit_sender.send_to(&buffer1).await;
643
644        // 验证状态
645        match &limit_sender.write_state {
646            WriteState::Writing((limit, written)) => {
647                assert_eq!(*limit, 60);
648                assert_eq!(*written, 5);
649            },
650            _ => panic!("Expected WriteState::Writing"),
651        }
652
653        // 第二次调用保持在Writing状态
654        let buffer2 = [0u8; 3];
655        let result = limit_sender.send_to(&buffer2).await;
656
657        // 验证结果
658        assert_eq!(result, Ok(3));
659    }
660
661    // 测试 LimitDatagramSend Writing 状态完成后回到 Idle 状态
662    #[tokio::test]
663    async fn test_limit_datagram_send_complete_writing_state() {
664        let mock_sender = MockDatagramSend::new()
665            .with_send_result(Ok(3))
666            .with_send_result(Ok(3));
667
668        let write_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(5).unwrap()), Some(NonZeroU32::new(5).unwrap()));
669        let write_limit = write_limiter.new_limit_session();
670        let mut limit_sender = LimitDatagramSend::new(mock_sender, write_limit);
671
672        // 第一次调用进入Writing状态
673        let buffer1 = [0u8; 3];
674        let _ = limit_sender.send_to(&buffer1).await;
675
676        // 验证状态
677        match &limit_sender.write_state {
678            WriteState::Writing((limit, written)) => {
679                assert_eq!(*limit, 5);
680                assert_eq!(*written, 3);
681            },
682            _ => panic!("Expected WriteState::Writing"),
683        }
684
685        // 第二次调用完成后应回到Idle状态
686        let buffer2 = [0u8; 3];
687        let _ = limit_sender.send_to(&buffer2).await;
688
689        // 验证状态回到Idle
690        match limit_sender.write_state {
691            WriteState::Idle => (),
692            _ => panic!("Expected WriteState::Idle"),
693        }
694    }
695
696    // 测试 LimitDatagramRecv 的基本功能
697    #[tokio::test]
698    async fn test_limit_datagram_recv_new() {
699        let mock_receiver = MockDatagramRecv::new();
700        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
701        let read_limit = read_limiter.new_limit_session();
702        let limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
703
704        match limit_receiver.read_state {
705            ReadState::Idle => (),
706            _ => panic!("Expected ReadState::Idle"),
707        }
708    }
709
710    // 测试 LimitDatagramRecv 在无限制时的行为
711    #[tokio::test]
712    async fn test_limit_datagram_recv_without_limit() {
713        let mock_receiver = MockDatagramRecv::new().with_recv_result(Ok(10));
714        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(100).unwrap()), Some(NonZeroU32::new(100).unwrap()));
715        let read_limit = read_limiter.new_limit_session();
716        let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
717
718        let mut buffer = [0u8; 10];
719        let result = limit_receiver.recv_from(&mut buffer).await;
720
721        assert_eq!(result, Ok(10));
722    }
723
724    // 测试 LimitDatagramRecv 在有限制时的行为
725    #[tokio::test]
726    async fn test_limit_datagram_recv_with_limit() {
727        let mock_receiver = MockDatagramRecv::new().with_recv_result(Ok(5));
728        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(10).unwrap()), Some(NonZeroU32::new(10).unwrap()));
729        let read_limit = read_limiter.new_limit_session();
730        let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
731
732        let mut buffer = [0u8; 5];
733        let result = limit_receiver.recv_from(&mut buffer).await;
734
735        assert!(result.is_ok());
736    }
737
738    // 测试 LimitDatagramRecv Reading 状态下的行为
739    #[tokio::test]
740    async fn test_limit_datagram_recv_in_reading_state() {
741        let mock_receiver = MockDatagramRecv::new()
742            .with_recv_result(Ok(5))
743            .with_recv_result(Ok(3));
744
745        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(1).unwrap()), Some(NonZeroU32::new(60).unwrap()));
746        let read_limit = read_limiter.new_limit_session();
747        let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
748
749        // 第一次调用进入Reading状态
750        let mut buffer1 = [0u8; 5];
751        let _ = limit_receiver.recv_from(&mut buffer1).await;
752
753        // 验证状态
754        match &limit_receiver.read_state {
755            ReadState::Reading((limit, read)) => {
756                assert_eq!(*limit, 60);
757                assert_eq!(*read, 5);
758            },
759            _ => panic!("Expected ReadState::Reading"),
760        }
761
762        // 第二次调用保持在Reading状态
763        let mut buffer2 = [0u8; 3];
764        let result = limit_receiver.recv_from(&mut buffer2).await;
765
766        // 验证结果
767        assert_eq!(result, Ok(3));
768    }
769
770    // 测试 LimitDatagramRecv Reading 状态完成后回到 Idle 状态
771    #[tokio::test]
772    async fn test_limit_datagram_recv_complete_reading_state() {
773        let mock_receiver = MockDatagramRecv::new()
774            .with_recv_result(Ok(3))
775            .with_recv_result(Ok(3));
776
777        let read_limiter = crate::SpeedLimiter::new(None, Some(NonZeroU32::new(5).unwrap()), Some(NonZeroU32::new(5).unwrap()));
778        let read_limit = read_limiter.new_limit_session();
779        let mut limit_receiver = LimitDatagramRecv::new(mock_receiver, read_limit);
780
781        // 第一次调用进入Reading状态
782        let mut buffer1 = [0u8; 3];
783        let _ = limit_receiver.recv_from(&mut buffer1).await;
784
785        // 验证状态
786        match &limit_receiver.read_state {
787            ReadState::Reading((limit, read)) => {
788                assert_eq!(*limit, 5);
789                assert_eq!(*read, 3);
790            },
791            _ => panic!("Expected ReadState::Reading"),
792        }
793
794        // 第二次调用完成后应回到Idle状态
795        let mut buffer2 = [0u8; 3];
796        let _ = limit_receiver.recv_from(&mut buffer2).await;
797
798        // 验证状态回到Idle
799        match limit_receiver.read_state {
800            ReadState::Idle => (),
801            _ => panic!("Expected ReadState::Idle"),
802        }
803    }
804
805}