1use std::time::Duration;
2
3use crate::io::{AsyncCacheRead, AsyncReadCacheTimeoutExt, AsyncReadTimeoutExt, PipeError};
4use ascii::AsciiString;
5use tokio::io::{AsyncRead, AsyncReadExt};
6
7pub trait PipeRead: AsyncRead + AsyncCacheRead {
8 fn get_timeout(&self) -> Duration;
9 fn set_timeout(&mut self, timeout: Duration);
10 fn get_block_size(&self) -> usize;
11 fn set_block_size(&mut self, block_size: usize);
12}
13
14impl<R: PipeRead> PipeReadExt for R {}
15
16pub trait PipeReadExt: PipeRead {
17 async fn recv(&mut self) -> Result<Vec<u8>, PipeError>
18 where
19 Self: Unpin,
20 {
21 let mut data = vec![0u8; self.get_block_size()];
22 let _ = self.read_timeout(&mut data, self.get_timeout()).await?;
23 Ok(data)
24 }
25
26 async fn recv_all(&mut self) -> Result<Vec<u8>, PipeError>
27 where
28 Self: Unpin,
29 {
30 let mut data = vec![];
31 let _ = self.read_to_end(&mut data).await?;
32 Ok(data)
33 }
34
35 async fn recv_all_timeout(
36 &mut self,
37 timeout: Duration,
38 keep_data: bool,
39 ) -> Result<Vec<u8>, PipeError>
40 where
41 Self: Unpin,
42 {
43 let mut data = vec![];
44 let _ = self
45 .read_to_end_timeout(&mut data, timeout, !keep_data)
46 .await?;
47 Ok(data)
48 }
49
50 async fn recvn(&mut self, len: usize) -> Result<Vec<u8>, PipeError>
51 where
52 Self: Unpin,
53 {
54 let mut data = vec![0u8; len];
55 let _ = self.read_timeout(&mut data, self.get_timeout()).await?;
56 Ok(data)
57 }
58
59 async fn recvn_fill(&mut self, len: usize) -> Result<Vec<u8>, PipeError>
60 where
61 Self: Unpin,
62 {
63 let mut data = vec![0u8; len];
64 let _ = self
65 .read_fill_timeout(&mut data, self.get_timeout())
66 .await?;
67 Ok(data)
68 }
69
70 async fn recvn_exact(&mut self, len: usize) -> Result<Vec<u8>, PipeError>
71 where
72 Self: Unpin,
73 {
74 let mut data = vec![0u8; len];
75 let _ = self
76 .read_exact_timeout(&mut data, self.get_timeout())
77 .await?;
78 Ok(data)
79 }
80
81 async fn recv_until<T: AsRef<[u8]>>(
82 &mut self,
83 delim: T,
84 drop: bool,
85 ) -> Result<Vec<u8>, PipeError>
86 where
87 Self: Unpin,
88 {
89 let mut buf = Vec::new();
90 let delim_len = delim.as_ref().len();
91 self.read_until_timeout(delim, &mut buf, self.get_timeout())
92 .await?;
93 if drop {
94 buf.drain(buf.len() - delim_len..);
95 }
96 Ok(buf)
97 }
98
99 async fn recv_until_regex(&mut self, pattern: &str, drop: bool) -> Result<Vec<u8>, PipeError>
100 where
101 Self: Unpin,
102 {
103 let mut buf = Vec::new();
104 let (_, match_len) = self
105 .read_until_regex_timeout(pattern, &mut buf, self.get_timeout())?
106 .await?;
107 if drop {
108 buf.drain(buf.len() - match_len..);
109 }
110 Ok(buf)
111 }
112
113 async fn recv_regex(&mut self, pattern: &str) -> Result<Vec<u8>, PipeError>
114 where
115 Self: Unpin,
116 {
117 let mut buf = Vec::new();
118 let (_, match_len) = self
119 .read_until_regex_timeout(pattern, &mut buf, self.get_timeout())?
120 .await?;
121 buf.drain(..buf.len() - match_len);
122 Ok(buf)
123 }
124
125 async fn recv_until_regex_split(
126 &mut self,
127 pattern: &str,
128 ) -> Result<(Vec<u8>, Vec<u8>), PipeError>
129 where
130 Self: Unpin,
131 {
132 let mut buf = Vec::new();
133 let (_, match_len) = self
134 .read_until_regex_timeout(pattern, &mut buf, self.get_timeout())?
135 .await?;
136 let (data, mch) = buf.split_at(buf.len() - match_len);
137 Ok((data.to_vec(), mch.to_vec()))
138 }
139
140 async fn recv_line(&mut self) -> Result<Vec<u8>, PipeError>
141 where
142 Self: Unpin,
143 {
144 self.recv_until(b"\n", true).await
145 }
146
147 async fn recv_line_crlf(&mut self) -> Result<Vec<u8>, PipeError>
148 where
149 Self: Unpin,
150 {
151 self.recv_until(b"\r\n", true).await
152 }
153
154 async fn recv_utf8(&mut self) -> Result<String, PipeError>
155 where
156 Self: Unpin,
157 {
158 let data = self.recv().await?;
159 Ok(String::from_utf8(data)?)
160 }
161
162 async fn recv_until_utf8<T: AsRef<[u8]>>(
163 &mut self,
164 delim: T,
165 drop: bool,
166 ) -> Result<String, PipeError>
167 where
168 Self: Unpin,
169 {
170 let data = self.recv_until(delim, drop).await?;
171 Ok(String::from_utf8(data)?)
172 }
173
174 async fn recv_until_regex_utf8(
175 &mut self,
176 pattern: &str,
177 drop: bool,
178 ) -> Result<String, PipeError>
179 where
180 Self: Unpin,
181 {
182 let data = self.recv_until_regex(pattern, drop).await?;
183 Ok(String::from_utf8(data)?)
184 }
185
186 async fn recv_regex_utf8(&mut self, pattern: &str) -> Result<String, PipeError>
187 where
188 Self: Unpin,
189 {
190 let data = self.recv_regex(pattern).await?;
191 Ok(String::from_utf8(data)?)
192 }
193
194 async fn recv_line_utf8(&mut self) -> Result<String, PipeError>
195 where
196 Self: Unpin,
197 {
198 let data = self.recv_line().await?;
199 Ok(String::from_utf8(data)?)
200 }
201
202 async fn recv_line_crlf_utf8(&mut self) -> Result<String, PipeError>
203 where
204 Self: Unpin,
205 {
206 let data = self.recv_line_crlf().await?;
207 Ok(String::from_utf8(data)?)
208 }
209
210 async fn recv_ascii(&mut self) -> Result<AsciiString, PipeError>
211 where
212 Self: Unpin,
213 {
214 let data = self.recv().await?;
215 Ok(AsciiString::from_ascii(data)?)
216 }
217
218 async fn recv_until_ascii<T: AsRef<[u8]>>(
219 &mut self,
220 delim: T,
221 drop: bool,
222 ) -> Result<AsciiString, PipeError>
223 where
224 Self: Unpin,
225 {
226 let data = self.recv_until(delim, drop).await?;
227 Ok(AsciiString::from_ascii(data)?)
228 }
229
230 async fn recv_until_regex_ascii(
231 &mut self,
232 pattern: &str,
233 drop: bool,
234 ) -> Result<AsciiString, PipeError>
235 where
236 Self: Unpin,
237 {
238 let data = self.recv_until_regex(pattern, drop).await?;
239 Ok(AsciiString::from_ascii(data)?)
240 }
241
242 async fn recv_regex_ascii(&mut self, pattern: &str) -> Result<AsciiString, PipeError>
243 where
244 Self: Unpin,
245 {
246 let data = self.recv_regex(pattern).await?;
247 Ok(AsciiString::from_ascii(data)?)
248 }
249
250 async fn recv_line_ascii(&mut self) -> Result<AsciiString, PipeError>
251 where
252 Self: Unpin,
253 {
254 let data = self.recv_line().await?;
255 Ok(AsciiString::from_ascii(data)?)
256 }
257
258 async fn recv_line_crlf_ascii(&mut self) -> Result<AsciiString, PipeError>
259 where
260 Self: Unpin,
261 {
262 let data = self.recv_line_crlf().await?;
263 Ok(AsciiString::from_ascii(data)?)
264 }
265}