1use embedded_io_async::{BufRead, Read, Write};
2
3use super::BypassError;
4
5pub struct BufferedRead<'buf, T: Read> {
9 inner: T,
10 buf: &'buf mut [u8],
11 offset: usize,
12 available: usize,
13}
14
15impl<'buf, T: Read> BufferedRead<'buf, T> {
16 pub fn new(inner: T, buf: &'buf mut [u8]) -> Self {
18 Self {
19 inner,
20 buf,
21 offset: 0,
22 available: 0,
23 }
24 }
25
26 pub fn new_with_data(inner: T, buf: &'buf mut [u8], offset: usize, available: usize) -> Self {
31 assert!(offset + available <= buf.len());
32 Self {
33 inner,
34 buf,
35 offset,
36 available,
37 }
38 }
39
40 pub fn is_empty(&self) -> bool {
42 self.available == 0
43 }
44
45 pub fn available(&self) -> usize {
47 self.available
48 }
49
50 pub fn bypass(&mut self) -> Result<&mut T, BypassError> {
52 match self.available {
53 0 => Ok(&mut self.inner),
54 _ => Err(BypassError),
55 }
56 }
57
58 pub fn release(self) -> T {
60 self.inner
61 }
62}
63
64impl<T: Read> embedded_io::ErrorType for BufferedRead<'_, T> {
65 type Error = T::Error;
66}
67
68impl<T: Read + Write> Write for BufferedRead<'_, T> {
69 async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
70 self.inner.write(buf).await
71 }
72
73 async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
74 self.inner.write_all(buf).await
75 }
76
77 async fn flush(&mut self) -> Result<(), Self::Error> {
78 self.inner.flush().await
79 }
80}
81
82impl<T: Read> Read for BufferedRead<'_, T> {
83 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
84 if self.available == 0 {
85 if buf.len() >= self.buf.len() {
86 return self.inner.read(buf).await;
88 }
89 self.offset = 0;
90 self.available = self.inner.read(self.buf).await?;
91 }
92
93 let len = usize::min(self.available, buf.len());
94 buf[..len].copy_from_slice(&self.buf[self.offset..self.offset + len]);
95 if len < self.available {
96 self.offset += len;
98 self.available -= len;
99 } else {
100 self.available = 0;
102 }
103
104 Ok(len)
105 }
106}
107
108impl<T: Read> BufRead for BufferedRead<'_, T> {
109 async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
110 if self.available == 0 {
111 self.offset = 0;
112 self.available = self.inner.read(self.buf).await?;
113 }
114
115 Ok(&self.buf[self.offset..self.offset + self.available])
116 }
117
118 fn consume(&mut self, amt: usize) {
119 assert!(amt <= self.available);
120 self.offset += amt;
121 self.available -= amt;
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128
129 #[tokio::test]
130 async fn can_read_to_buffer() {
131 let inner = [1, 2, 3, 4, 5, 6, 7, 8];
132 let mut buf = [0; 8];
133 let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
134
135 let mut read_buf = [0; 2];
136 assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
137 assert_eq!(2, buffered.offset);
138 assert_eq!(6, buffered.available);
139 assert_eq!(&[1, 2], read_buf.as_slice());
140
141 let mut read_buf = [0; 2];
142 assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
143 assert_eq!(4, buffered.offset);
144 assert_eq!(4, buffered.available);
145 assert_eq!(&[3, 4], read_buf.as_slice());
146
147 let mut read_buf = [0; 8];
148 assert_eq!(4, buffered.read(&mut read_buf).await.unwrap());
149 assert_eq!(4, buffered.offset);
150 assert_eq!(0, buffered.available);
151 assert_eq!(&[5, 6, 7, 8], &read_buf[..4]);
152 }
153
154 #[tokio::test]
155 async fn bypass_on_large_buf() {
156 let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
157 let mut buf = [0; 8];
158 let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
159
160 let mut read_buf = [0; 10];
161 assert_eq!(10, buffered.read(&mut read_buf).await.unwrap());
162 assert_eq!(0, buffered.offset);
163 assert_eq!(0, buffered.available);
164 assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], read_buf.as_slice());
165 }
166
167 #[tokio::test]
168 async fn can_buf_read() {
169 let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
170 let mut buf = [0; 8];
171 let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
172 assert_eq!(0, buffered.offset);
173 assert_eq!(0, buffered.available);
174
175 assert_eq!(
176 &[1, 2, 3, 4, 5, 6, 7, 8],
177 buffered.fill_buf().await.unwrap()
178 );
179 assert_eq!(0, buffered.offset);
180 assert_eq!(8, buffered.available);
181
182 buffered.consume(2);
183 assert_eq!(2, buffered.offset);
184 assert_eq!(6, buffered.available);
185 assert_eq!(&[3, 4, 5, 6, 7, 8], buffered.fill_buf().await.unwrap());
186
187 buffered.consume(6);
188 assert_eq!(8, buffered.offset);
189 assert_eq!(0, buffered.available);
190
191 assert_eq!(&[9, 10], buffered.fill_buf().await.unwrap());
192 assert_eq!(0, buffered.offset);
193 assert_eq!(2, buffered.available);
194
195 buffered.consume(2);
196 assert_eq!(2, buffered.offset);
197 assert_eq!(0, buffered.available);
198 }
199}