1mod copy;
22
23use std::io;
24use std::io::ErrorKind;
25
26use async_trait::async_trait;
27use futures::io::{ReadHalf, WriteHalf};
28use futures::prelude::*;
29use futures::{AsyncReadExt, AsyncWriteExt};
30
31pub use copy::copy;
32
33#[async_trait]
36pub trait ReadEx: Send {
37 async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error>;
56
57 async fn read_exact2<'a>(&'a mut self, buf: &'a mut [u8]) -> Result<(), io::Error> {
62 let mut buf_piece = buf;
63 while !buf_piece.is_empty() {
64 let n = self.read2(buf_piece).await?;
65 if n == 0 {
66 return Err(ErrorKind::UnexpectedEof.into());
67 }
68
69 let (_, rest) = buf_piece.split_at_mut(n);
70 buf_piece = rest;
71 }
72 Ok(())
73 }
74
75 async fn read_fixed_u32(&mut self) -> Result<usize, io::Error> {
80 let mut len = [0; 4];
81 self.read_exact2(&mut len).await?;
82 let n = u32::from_be_bytes(len) as usize;
83
84 Ok(n)
85 }
86
87 async fn read_varint(&mut self) -> Result<usize, io::Error> {
98 let mut buffer = unsigned_varint::encode::usize_buffer();
99 let mut buffer_len = 0;
100
101 loop {
102 match self.read2(&mut buffer[buffer_len..=buffer_len]).await? {
103 0 => {
104 if buffer_len == 0 {
108 return Ok(0);
109 } else {
110 return Err(io::ErrorKind::UnexpectedEof.into());
111 }
112 }
113 n => debug_assert_eq!(n, 1),
114 }
115
116 buffer_len += 1;
117
118 match unsigned_varint::decode::usize(&buffer[..buffer_len]) {
119 Ok((len, _)) => return Ok(len),
120 Err(unsigned_varint::decode::Error::Overflow) => {
121 return Err(io::Error::new(io::ErrorKind::InvalidData, "overflow in variable-length integer"));
122 }
123 Err(_) => {}
126 }
127 }
128 }
129
130 async fn read_one_fixed(&mut self, max_size: usize) -> Result<Vec<u8>, io::Error> {
139 let len = self.read_fixed_u32().await?;
140 if len > max_size {
141 return Err(io::Error::new(
142 io::ErrorKind::InvalidData,
143 format!("Received data size over maximum frame length: {}>{}", len, max_size),
144 ));
145 }
146
147 let mut buf = vec![0; len];
148 self.read_exact2(&mut buf).await?;
149 Ok(buf)
150 }
151
152 async fn read_one(&mut self, max_size: usize) -> Result<Vec<u8>, io::Error> {
164 let len = self.read_varint().await?;
165 if len > max_size {
166 return Err(io::Error::new(
167 io::ErrorKind::InvalidData,
168 format!("Received data size over maximum frame length: {}>{}", len, max_size),
169 ));
170 }
171
172 let mut buf = vec![0; len];
173 self.read_exact2(&mut buf).await?;
174 Ok(buf)
175 }
176}
177
178#[async_trait]
181pub trait WriteEx: Send {
182 async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error>;
187 async fn write_all2(&mut self, buf: &[u8]) -> Result<(), io::Error> {
194 let mut buf_piece = buf;
195 while !buf_piece.is_empty() {
196 let n = self.write2(buf_piece).await?;
197 if n == 0 {
198 return Err(io::ErrorKind::WriteZero.into());
199 }
200
201 let (_, rest) = buf_piece.split_at(n);
202 buf_piece = rest;
203 }
204 Ok(())
205 }
206
207 async fn write_varint(&mut self, len: usize) -> Result<(), io::Error> {
214 let mut len_data = unsigned_varint::encode::usize_buffer();
215 let encoded_len = unsigned_varint::encode::usize(len, &mut len_data).len();
216 self.write_all2(&len_data[..encoded_len]).await?;
217 Ok(())
218 }
219
220 async fn write_fixed_u32(&mut self, len: usize) -> Result<(), io::Error> {
227 self.write_all2((len as u32).to_be_bytes().as_ref()).await?;
228 Ok(())
229 }
230
231 async fn write_one_fixed(&mut self, buf: &[u8]) -> Result<(), io::Error> {
236 self.write_fixed_u32(buf.len()).await?;
237 self.write_all2(buf).await?;
238 self.flush2().await?;
239 Ok(())
240 }
241
242 async fn write_one(&mut self, buf: &[u8]) -> Result<(), io::Error> {
250 self.write_varint(buf.len()).await?;
251 self.write_all2(buf).await?;
252 self.flush2().await?;
253 Ok(())
254 }
255
256 async fn flush2(&mut self) -> Result<(), io::Error>;
262
263 async fn close2(&mut self) -> Result<(), io::Error>;
268}
269
270#[async_trait]
271impl<T: AsyncRead + Unpin + Send> ReadEx for T {
272 async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
273 let n = AsyncReadExt::read(self, buf).await?;
274 Ok(n)
275 }
276}
277
278#[async_trait]
279impl<T: AsyncWrite + Unpin + Send> WriteEx for T {
280 async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
281 AsyncWriteExt::write(self, buf).await
282 }
283
284 async fn flush2(&mut self) -> Result<(), io::Error> {
285 AsyncWriteExt::flush(self).await
286 }
287
288 async fn close2(&mut self) -> Result<(), io::Error> {
289 AsyncWriteExt::close(self).await
290 }
291}
292
293pub trait SplitEx {
297 type Reader: ReadEx + Unpin;
299 type Writer: WriteEx + Unpin;
301
302 fn split(self) -> (Self::Reader, Self::Writer);
304}
305
306impl<T: AsyncRead + AsyncWrite + Send + Unpin> SplitEx for T {
308 type Reader = ReadHalf<T>;
309 type Writer = WriteHalf<T>;
310
311 fn split(self) -> (Self::Reader, Self::Writer) {
312 futures::AsyncReadExt::split(self)
313 }
314}
315
316pub trait SplittableReadWrite: ReadEx + WriteEx + SplitEx + Unpin + 'static {}
318
319impl<T: ReadEx + WriteEx + SplitEx + Unpin + 'static> SplittableReadWrite for T {}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use futures::io::{self, AsyncReadExt, Cursor};
325 use libp2prs_runtime::task;
326
327 struct Test(Cursor<Vec<u8>>);
328
329 #[async_trait]
330 impl ReadEx for Test {
331 async fn read2(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
332 self.0.read(buf).await
333 }
334 }
335
336 #[async_trait]
337 impl WriteEx for Test {
338 async fn write2(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
339 self.0.write(buf).await
340 }
341
342 async fn flush2(&mut self) -> Result<(), io::Error> {
343 self.0.flush().await
344 }
345
346 async fn close2(&mut self) -> Result<(), io::Error> {
347 self.0.close().await
348 }
349 }
350
351 #[test]
353 fn test_read() {
354 task::block_on(async {
355 let mut reader = Test(Cursor::new(vec![1, 2, 3, 4]));
356 let mut output = [0u8; 3];
357 let bytes = reader.read2(&mut output[..]).await.unwrap();
358
359 assert_eq!(bytes, 3);
360 assert_eq!(output, [1, 2, 3]);
361 });
362 }
363
364 #[test]
366 fn test_read_string() {
367 task::block_on(async {
368 let mut reader = Test(Cursor::new(b"hello world".to_vec()));
369 let mut output = [0u8; 3];
370 let bytes = reader.read2(&mut output[..]).await.unwrap();
371
372 assert_eq!(bytes, 3);
373 assert_eq!(output, [104, 101, 108]);
374 });
375 }
376
377 #[test]
378 fn test_read_exact() {
379 task::block_on(async {
380 let mut reader = Test(Cursor::new(vec![1, 2, 3, 4]));
381 let mut output = [0u8; 3];
382 let _bytes = reader.read_exact2(&mut output[..]).await;
383
384 assert_eq!(output, [1, 2, 3]);
385 });
386 }
387
388 #[test]
389 fn test_read_fixed_u32() {
390 task::block_on(async {
391 let mut reader = Test(Cursor::new(b"hello world".to_vec()));
392 let size = reader.read_fixed_u32().await.unwrap();
393
394 assert_eq!(size, 1751477356);
395 });
396 }
397
398 #[test]
399 fn test_read_varint() {
400 task::block_on(async {
401 let mut reader = Test(Cursor::new(vec![1, 2, 3, 4, 5, 6]));
402 let size = reader.read_varint().await.unwrap();
403
404 assert_eq!(size, 1);
405 });
406 }
407
408 #[test]
409 fn test_read_one() {
410 task::block_on(async {
411 let mut reader = Test(Cursor::new(vec![11, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
412 let output = match reader.read_one(11).await {
413 Ok(v) => v,
414 _ => Vec::new(),
415 };
416
417 assert_eq!(output, b"hello world");
418 });
419 }
420
421 #[test]
422 fn test_write() {
423 task::block_on(async {
424 let mut writer = Test(Cursor::new(vec![0u8; 5]));
425 let size = writer.write2(&[1, 2, 3, 4]).await.unwrap();
426
427 assert_eq!(size, 4);
428 assert_eq!(writer.0.get_mut(), &[1, 2, 3, 4, 0])
429 });
430 }
431
432 #[test]
433 fn test_write_all2() {
434 task::block_on(async {
435 let mut writer = Test(Cursor::new(vec![0u8; 4]));
436 let output = vec![1, 2, 3, 4, 5];
437 let _bytes = writer.write_all2(&output[..]).await.unwrap();
438
439 assert_eq!(writer.0.get_mut(), &[1, 2, 3, 4, 5]);
440 });
441 }
442
443 #[test]
444 fn test_write_fixed_u32() {
445 task::block_on(async {
446 let mut writer = Test(Cursor::new(b"hello world".to_vec()));
447 let _result = writer.write_fixed_u32(1751477356).await.unwrap();
448
449 assert_eq!(writer.0.position(), 4);
452 });
453 }
454
455 #[test]
456 fn test_write_varint() {
457 task::block_on(async {
458 let mut writer = Test(Cursor::new(vec![2, 2, 3, 4, 5, 6]));
459 let _result = writer.write_varint(1).await.unwrap();
460
461 assert_eq!(writer.0.position(), 1);
462 });
463 }
464
465 #[test]
466 fn test_write_one() {
467 task::block_on(async {
468 let mut writer = Test(Cursor::new(vec![0u8; 0]));
469 let _result = writer.write_one("hello world".as_ref()).await;
470 assert_eq!(writer.0.get_mut(), &[11, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]);
471 });
472 }
473}