1mod bytes_mut_ext;
4mod chan;
5pub mod debug;
6mod debug_io_layer;
7mod test_io_service;
8mod timout_counter;
9mod write_to;
10
11pub use bytes_mut_ext::BytesMutHelper;
12pub use chan::sx_rx_chans;
13pub use debug_io_layer::{DebugIoLayer, DebugIoService};
14pub use test_io_service::TestIoService;
15pub use timout_counter::{CountOrDuration, NextTimeout, TimeoutCounter};
16pub use write_to::WriteTo;
17
18use std::cmp::min;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::time::Duration;
21use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
22
23pub enum ZeroReadBehaviour {
24 TickAndYield,
25 TickSleep,
26 }
28
29pub struct AsyncReadToBuf<const S: usize> {
32 zero_read_behaviour: ZeroReadBehaviour,
33}
34
35impl AsyncReadToBuf<1024> {
36 pub const fn new_1024(zero_read_behaviour: ZeroReadBehaviour) -> AsyncReadToBuf<1024> {
37 AsyncReadToBuf::<1024> { zero_read_behaviour }
38 }
39}
40
41impl<const S: usize> AsyncReadToBuf<S> {
42 pub const fn new(zero_read_behaviour: ZeroReadBehaviour) -> Self {
43 AsyncReadToBuf::<S> { zero_read_behaviour }
44 }
45
46 pub async fn read_until<READ: AsyncReadExt + Unpin + Send + 'static>(&self, reader: &mut BufReader<READ>, byte: u8) -> Vec<u8> {
47 let mut buf = Vec::with_capacity(S);
48 reader.read_until(byte, &mut buf).await.unwrap();
49 buf
50 }
51 pub async fn read_with_timeout_bytes<READ: AsyncReadExt + Unpin + Send + 'static>(
52 &self,
53 reader: &mut READ,
54 _writer: &mut bytes::BytesMut,
55 timeout: Duration,
56 desired_size: Option<usize>,
57 ) {
58 let mut data = match desired_size {
60 None => bytes::BytesMut::new(),
61 Some(sz) => bytes::BytesMut::with_capacity(sz),
62 };
63 let mut buffer = [0u8; S];
64 let finished = AtomicBool::new(false);
65 let tries = AtomicU32::new(0);
66 const MAX_TRIES: u32 = 10;
67 let calculated_timeout = timeout / MAX_TRIES;
68 while !finished.load(Ordering::SeqCst) && tries.load(Ordering::SeqCst) < MAX_TRIES {
69 let buf_size = match desired_size {
70 None => S,
71 Some(sz) => min(S, sz - data.len()),
72 };
73 tokio::select! {
74 _ = tokio::time::sleep(calculated_timeout) => {
75 let new_tries = tries.fetch_add(1, Ordering::SeqCst) + 1;
76 if new_tries >= MAX_TRIES {
77 finished.store(true, Ordering::SeqCst);
78 }
79 }
80 n = reader.read(&mut buffer[..buf_size]) => {
81 match n {
82 Ok(n) => {
83 if n != 0 {
84 data.extend_from_slice(&buffer[..n]);
85 tries.store(0, Ordering::SeqCst);
86 } else {
87 match self.zero_read_behaviour {
88 ZeroReadBehaviour::TickAndYield => {
89 tries.fetch_add(1, Ordering::SeqCst);
90 tokio::task::yield_now().await;
91 }
92 ZeroReadBehaviour::TickSleep => {
93 tries.fetch_add(1, Ordering::SeqCst);
94 tokio::time::sleep(calculated_timeout).await;
95 }
96 }
97
98 }
99 }
100 Err(_) => {
101 finished.store(true, Ordering::SeqCst);
102 }
103 }
104 }
105 }
106 }
107 }
108
109 pub async fn read_with_timeout<READ: AsyncReadExt + Unpin + Send + 'static>(&self, reader: &mut READ, timeout: Duration, desired_size: Option<usize>) -> Vec<u8> {
110 let mut buf = bytes::BytesMut::new();
111 self.read_with_timeout_bytes(reader, &mut buf, timeout, desired_size).await;
112 buf.to_vec()
113 }
114}
115
116#[cfg(test)]
117mod test {
118 use crate::{AsyncReadToBuf, ZeroReadBehaviour};
119 use std::io::Cursor;
120 use std::time::Duration;
121
122 #[tokio::test]
123 async fn test_limited_read() {
124 let data = b"hello world";
125 let mut cursor = Cursor::new(data);
126 let async_read = AsyncReadToBuf::new_1024(ZeroReadBehaviour::TickAndYield);
127 let result = async_read.read_with_timeout(&mut cursor, Duration::from_secs(1), Some(5)).await;
128 assert_eq!(result, b"hello");
129 }
130
131 #[tokio::test]
132 async fn test_large_read() {
133 let data = [0xff; 4096];
134 let mut cursor = Cursor::new(data);
135 let async_read = AsyncReadToBuf::new_1024(ZeroReadBehaviour::TickAndYield);
136 let result = async_read.read_with_timeout(&mut cursor, Duration::from_secs(1), Some(2050)).await;
137 assert_eq!(result, &[0xff; 2050]);
138 }
139}