proto_tower_util/
lib.rs

1//! Proto-tower-util is a collection of utilities that are used across the proto-tower project.
2
3mod bytes_mut_ext;
4mod chan;
5pub mod debug;
6mod debug_io_layer;
7mod test_io_service;
8mod timout_counter;
9mod write_to;
10
11pub use bytes_mut_ext::BytesMutHelper;
12pub use chan::sx_rx_chans;
13pub use debug_io_layer::{DebugIoLayer, DebugIoService};
14pub use test_io_service::TestIoService;
15pub use timout_counter::{CountOrDuration, NextTimeout, TimeoutCounter};
16pub use write_to::WriteTo;
17
18use std::cmp::min;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::time::Duration;
21use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
22
23pub enum ZeroReadBehaviour {
24    TickAndYield,
25    TickSleep,
26    // TickMeasure,
27}
28
29/// A helper to read data within a timeout
30/// This handles both the cases where the read(&mut buffer) may not return any data or constantly return len 0
31pub struct AsyncReadToBuf<const S: usize> {
32    zero_read_behaviour: ZeroReadBehaviour,
33}
34
35impl AsyncReadToBuf<1024> {
36    pub const fn new_1024(zero_read_behaviour: ZeroReadBehaviour) -> AsyncReadToBuf<1024> {
37        AsyncReadToBuf::<1024> { zero_read_behaviour }
38    }
39}
40
41impl<const S: usize> AsyncReadToBuf<S> {
42    pub const fn new(zero_read_behaviour: ZeroReadBehaviour) -> Self {
43        AsyncReadToBuf::<S> { zero_read_behaviour }
44    }
45
46    pub async fn read_until<READ: AsyncReadExt + Unpin + Send + 'static>(&self, reader: &mut BufReader<READ>, byte: u8) -> Vec<u8> {
47        let mut buf = Vec::with_capacity(S);
48        reader.read_until(byte, &mut buf).await.unwrap();
49        buf
50    }
51    pub async fn read_with_timeout_bytes<READ: AsyncReadExt + Unpin + Send + 'static>(
52        &self,
53        reader: &mut READ,
54        _writer: &mut bytes::BytesMut,
55        timeout: Duration,
56        desired_size: Option<usize>,
57    ) {
58        // TODO(https://github.com/rapidrecast/proto-tower/issues/1): Use async read buffer
59        let mut data = match desired_size {
60            None => bytes::BytesMut::new(),
61            Some(sz) => bytes::BytesMut::with_capacity(sz),
62        };
63        let mut buffer = [0u8; S];
64        let finished = AtomicBool::new(false);
65        let tries = AtomicU32::new(0);
66        const MAX_TRIES: u32 = 10;
67        let calculated_timeout = timeout / MAX_TRIES;
68        while !finished.load(Ordering::SeqCst) && tries.load(Ordering::SeqCst) < MAX_TRIES {
69            let buf_size = match desired_size {
70                None => S,
71                Some(sz) => min(S, sz - data.len()),
72            };
73            tokio::select! {
74                _ = tokio::time::sleep(calculated_timeout) => {
75                    let new_tries = tries.fetch_add(1, Ordering::SeqCst) + 1;
76                    if new_tries >= MAX_TRIES {
77                        finished.store(true, Ordering::SeqCst);
78                    }
79                }
80                n = reader.read(&mut buffer[..buf_size]) => {
81                    match n {
82                        Ok(n) => {
83                            if n != 0 {
84                                data.extend_from_slice(&buffer[..n]);
85                                tries.store(0, Ordering::SeqCst);
86                            } else {
87                                match self.zero_read_behaviour {
88                                    ZeroReadBehaviour::TickAndYield => {
89                                        tries.fetch_add(1, Ordering::SeqCst);
90                                        tokio::task::yield_now().await;
91                                    }
92                                    ZeroReadBehaviour::TickSleep => {
93                                        tries.fetch_add(1, Ordering::SeqCst);
94                                        tokio::time::sleep(calculated_timeout).await;
95                                    }
96                                }
97
98                            }
99                        }
100                        Err(_) => {
101                            finished.store(true, Ordering::SeqCst);
102                        }
103                    }
104                }
105            }
106        }
107    }
108
109    pub async fn read_with_timeout<READ: AsyncReadExt + Unpin + Send + 'static>(&self, reader: &mut READ, timeout: Duration, desired_size: Option<usize>) -> Vec<u8> {
110        let mut buf = bytes::BytesMut::new();
111        self.read_with_timeout_bytes(reader, &mut buf, timeout, desired_size).await;
112        buf.to_vec()
113    }
114}
115
116#[cfg(test)]
117mod test {
118    use crate::{AsyncReadToBuf, ZeroReadBehaviour};
119    use std::io::Cursor;
120    use std::time::Duration;
121
122    #[tokio::test]
123    async fn test_limited_read() {
124        let data = b"hello world";
125        let mut cursor = Cursor::new(data);
126        let async_read = AsyncReadToBuf::new_1024(ZeroReadBehaviour::TickAndYield);
127        let result = async_read.read_with_timeout(&mut cursor, Duration::from_secs(1), Some(5)).await;
128        assert_eq!(result, b"hello");
129    }
130
131    #[tokio::test]
132    async fn test_large_read() {
133        let data = [0xff; 4096];
134        let mut cursor = Cursor::new(data);
135        let async_read = AsyncReadToBuf::new_1024(ZeroReadBehaviour::TickAndYield);
136        let result = async_read.read_with_timeout(&mut cursor, Duration::from_secs(1), Some(2050)).await;
137        assert_eq!(result, &[0xff; 2050]);
138    }
139}