buffered_io/asynch/
read.rs

1use embedded_io_async::{BufRead, Read, Write};
2
3use super::BypassError;
4
5/// A buffered [`Read`]
6///
7/// The BufferedRead will read into the provided buffer to avoid small reads to the inner reader.
8pub struct BufferedRead<'buf, T: Read> {
9    inner: T,
10    buf: &'buf mut [u8],
11    offset: usize,
12    available: usize,
13}
14
15impl<'buf, T: Read> BufferedRead<'buf, T> {
16    /// Create a new buffered reader
17    pub fn new(inner: T, buf: &'buf mut [u8]) -> Self {
18        Self {
19            inner,
20            buf,
21            offset: 0,
22            available: 0,
23        }
24    }
25
26    /// Create a new buffered reader with the first `available` bytes readily available at `offset`.
27    ///
28    /// This is useful if for some reason the inner reader was previously consumed by a greedy reader
29    /// in a way such that the BufferedRead must inherit these excess bytes.
30    pub fn new_with_data(inner: T, buf: &'buf mut [u8], offset: usize, available: usize) -> Self {
31        assert!(offset + available <= buf.len());
32        Self {
33            inner,
34            buf,
35            offset,
36            available,
37        }
38    }
39
40    /// Get whether there are any bytes readily available
41    pub fn is_empty(&self) -> bool {
42        self.available == 0
43    }
44
45    /// Get the number of bytes that are readily availbale
46    pub fn available(&self) -> usize {
47        self.available
48    }
49
50    /// Get the inner reader if there are no currently buffered, available bytes
51    pub fn bypass(&mut self) -> Result<&mut T, BypassError> {
52        match self.available {
53            0 => Ok(&mut self.inner),
54            _ => Err(BypassError),
55        }
56    }
57
58    /// Release and get the inner reader
59    pub fn release(self) -> T {
60        self.inner
61    }
62}
63
64impl<T: Read> embedded_io::ErrorType for BufferedRead<'_, T> {
65    type Error = T::Error;
66}
67
68impl<T: Read + Write> Write for BufferedRead<'_, T> {
69    async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
70        self.inner.write(buf).await
71    }
72
73    async fn write_all(&mut self, buf: &[u8]) -> Result<(), Self::Error> {
74        self.inner.write_all(buf).await
75    }
76
77    async fn flush(&mut self) -> Result<(), Self::Error> {
78        self.inner.flush().await
79    }
80}
81
82impl<T: Read> Read for BufferedRead<'_, T> {
83    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
84        if self.available == 0 {
85            if buf.len() >= self.buf.len() {
86                // Fast path - bypass local buffer
87                return self.inner.read(buf).await;
88            }
89            self.offset = 0;
90            self.available = self.inner.read(self.buf).await?;
91        }
92
93        let len = usize::min(self.available, buf.len());
94        buf[..len].copy_from_slice(&self.buf[self.offset..self.offset + len]);
95        if len < self.available {
96            // There are still bytes left
97            self.offset += len;
98            self.available -= len;
99        } else {
100            // The buffer is drained
101            self.available = 0;
102        }
103
104        Ok(len)
105    }
106}
107
108impl<T: Read> BufRead for BufferedRead<'_, T> {
109    async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
110        if self.available == 0 {
111            self.offset = 0;
112            self.available = self.inner.read(self.buf).await?;
113        }
114
115        Ok(&self.buf[self.offset..self.offset + self.available])
116    }
117
118    fn consume(&mut self, amt: usize) {
119        assert!(amt <= self.available);
120        self.offset += amt;
121        self.available -= amt;
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128
129    #[tokio::test]
130    async fn can_read_to_buffer() {
131        let inner = [1, 2, 3, 4, 5, 6, 7, 8];
132        let mut buf = [0; 8];
133        let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
134
135        let mut read_buf = [0; 2];
136        assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
137        assert_eq!(2, buffered.offset);
138        assert_eq!(6, buffered.available);
139        assert_eq!(&[1, 2], read_buf.as_slice());
140
141        let mut read_buf = [0; 2];
142        assert_eq!(2, buffered.read(&mut read_buf).await.unwrap());
143        assert_eq!(4, buffered.offset);
144        assert_eq!(4, buffered.available);
145        assert_eq!(&[3, 4], read_buf.as_slice());
146
147        let mut read_buf = [0; 8];
148        assert_eq!(4, buffered.read(&mut read_buf).await.unwrap());
149        assert_eq!(4, buffered.offset);
150        assert_eq!(0, buffered.available);
151        assert_eq!(&[5, 6, 7, 8], &read_buf[..4]);
152    }
153
154    #[tokio::test]
155    async fn bypass_on_large_buf() {
156        let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
157        let mut buf = [0; 8];
158        let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
159
160        let mut read_buf = [0; 10];
161        assert_eq!(10, buffered.read(&mut read_buf).await.unwrap());
162        assert_eq!(0, buffered.offset);
163        assert_eq!(0, buffered.available);
164        assert_eq!(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], read_buf.as_slice());
165    }
166
167    #[tokio::test]
168    async fn can_buf_read() {
169        let inner = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
170        let mut buf = [0; 8];
171        let mut buffered = BufferedRead::new(inner.as_slice(), &mut buf);
172        assert_eq!(0, buffered.offset);
173        assert_eq!(0, buffered.available);
174
175        assert_eq!(
176            &[1, 2, 3, 4, 5, 6, 7, 8],
177            buffered.fill_buf().await.unwrap()
178        );
179        assert_eq!(0, buffered.offset);
180        assert_eq!(8, buffered.available);
181
182        buffered.consume(2);
183        assert_eq!(2, buffered.offset);
184        assert_eq!(6, buffered.available);
185        assert_eq!(&[3, 4, 5, 6, 7, 8], buffered.fill_buf().await.unwrap());
186
187        buffered.consume(6);
188        assert_eq!(8, buffered.offset);
189        assert_eq!(0, buffered.available);
190
191        assert_eq!(&[9, 10], buffered.fill_buf().await.unwrap());
192        assert_eq!(0, buffered.offset);
193        assert_eq!(2, buffered.available);
194
195        buffered.consume(2);
196        assert_eq!(2, buffered.offset);
197        assert_eq!(0, buffered.available);
198    }
199}