1use crate::DEFAULT_BUFFER_SIZE;
2use futures_lite::{io, ready, AsyncBufRead, AsyncRead};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6pub trait MapReadFn {
8 fn map_read(&mut self, buf: &mut [u8]);
14}
15
16impl<F> MapReadFn for F
17where
18 F: FnMut(&mut [u8]),
19{
20 fn map_read(&mut self, buf: &mut [u8]) {
21 self(buf)
22 }
23}
24
25pin_project_lite::pin_project! {
26 pub struct AsyncMapReader<'a, R> {
37 #[pin]
38 inner: R,
39 process_fn: Box<dyn MapReadFn + 'a>,
40 pos: usize, cap: usize, buf: Box<[u8]>, }
44}
45
46impl<'a, R> AsyncMapReader<'a, R>
47where
48 R: AsyncRead,
49{
50 pub fn new(reader: R, process_fn: impl MapReadFn + 'a) -> Self {
52 Self {
53 inner: reader,
54 process_fn: Box::new(process_fn),
55 pos: 0,
56 cap: 0,
57 buf: vec![0; DEFAULT_BUFFER_SIZE].into_boxed_slice(), }
59 }
60
61 pub fn with_capacity(reader: R, process_fn: impl MapReadFn + 'a, capacity: usize) -> Self {
63 Self {
64 inner: reader,
65 process_fn: Box::new(process_fn),
66 pos: 0,
67 cap: 0,
68 buf: vec![0; capacity].into_boxed_slice(),
69 }
70 }
71
72 pub fn into_inner(self) -> R {
74 self.inner
75 }
76}
77
78impl<'a, R> AsyncRead for AsyncMapReader<'a, R>
79where
80 R: AsyncRead,
81{
82 fn poll_read(
83 mut self: Pin<&mut Self>,
84 cx: &mut Context<'_>,
85 buf: &mut [u8],
86 ) -> Poll<io::Result<usize>> {
87 if self.pos == self.cap {
88 let fill = ready!(self.as_mut().poll_fill_buf(cx))?;
89 if fill.is_empty() {
90 return Poll::Ready(Ok(0));
91 }
92 }
93 let rem = {
94 let this = self.as_mut().project();
95 &this.buf[*this.pos..*this.cap]
96 };
97 let amt = std::cmp::min(rem.len(), buf.len());
98 buf[..amt].copy_from_slice(&rem[..amt]);
99 self.consume(amt);
100 Poll::Ready(Ok(amt))
101 }
102}
103
104impl<'a, R: AsyncRead> AsyncBufRead for AsyncMapReader<'a, R> {
105 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
106 let mut this = self.project();
107 if *this.pos >= *this.cap {
108 debug_assert!(*this.pos == *this.cap);
109 *this.pos = 0;
110 *this.cap = 0;
111 let read_amount = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
112 if read_amount == 0 {
113 return Poll::Ready(Ok(&[]));
114 }
115 (this.process_fn).map_read(&mut this.buf[..read_amount]);
116 *this.cap = read_amount;
117 }
118 Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
119 }
120
121 fn consume(self: Pin<&mut Self>, amt: usize) {
122 let this = self.project();
123 *this.pos = std::cmp::min(*this.pos + amt, *this.cap);
124 }
125}
126
127pub trait AsyncMapRead<'a, R> {
129 fn map(self, f: impl MapReadFn + 'a) -> AsyncMapReader<'a, R>
132 where
133 Self: Sized,
134 {
135 self.map_with_capacity(f, DEFAULT_BUFFER_SIZE)
136 }
137
138 fn map_with_capacity(self, f: impl MapReadFn + 'a, capacity: usize) -> AsyncMapReader<'a, R>;
144}
145
146impl<'a, R: AsyncRead> AsyncMapRead<'a, R> for R {
147 fn map_with_capacity(self, f: impl MapReadFn + 'a, capacity: usize) -> AsyncMapReader<'a, R> {
148 AsyncMapReader::with_capacity(self, f, capacity)
149 }
150}