block_device_adapters/
stream_slice.rs

1use core::cmp;
2use core::fmt::Debug;
3use embedded_io_async::{Read, Seek, SeekFrom, Write};
4
5#[cfg_attr(feature = "defmt", derive(defmt::Format))]
6#[derive(Debug)]
7#[non_exhaustive]
8pub enum StreamSliceError<T: Debug> {
9    InvalidSeek(i64),
10    WriteZero,
11    Other(T),
12}
13
14impl<E: Debug> From<E> for StreamSliceError<E> {
15    fn from(e: E) -> Self {
16        Self::Other(e)
17    }
18}
19
20/// Stream wrapper for accessing limited segment of data from underlying file or device.
21pub struct StreamSlice<T: Read + Write + Seek> {
22    inner: T,
23    start_offset: u64,
24    current_offset: u64,
25    size: u64,
26}
27
28impl<E: Debug> embedded_io_async::Error for StreamSliceError<E> {
29    fn kind(&self) -> embedded_io_async::ErrorKind {
30        match self {
31            StreamSliceError::InvalidSeek(_) => embedded_io_async::ErrorKind::InvalidInput,
32            StreamSliceError::Other(_) | StreamSliceError::WriteZero => {
33                embedded_io_async::ErrorKind::Other
34            }
35        }
36    }
37}
38
39impl<T: Read + Write + Seek> embedded_io_async::ErrorType for StreamSlice<T> {
40    type Error = StreamSliceError<T::Error>;
41}
42
43impl<T: Read + Write + Seek> StreamSlice<T> {
44    /// Creates new `StreamSlice` from inner stream and offset range.
45    ///
46    /// `start_offset` is inclusive offset of the first accessible byte.
47    /// `end_offset` is exclusive offset of the first non-accessible byte.
48    /// `start_offset` must be lower or equal to `end_offset`.
49    pub async fn new(
50        mut inner: T,
51        start_offset: u64,
52        end_offset: u64,
53    ) -> Result<Self, StreamSliceError<T::Error>> {
54        debug_assert!(end_offset >= start_offset);
55        inner.seek(SeekFrom::Start(start_offset)).await?;
56        let size = end_offset - start_offset;
57        Ok(StreamSlice {
58            start_offset,
59            size,
60            inner,
61            current_offset: 0,
62        })
63    }
64
65    /// Returns inner object
66    pub fn into_inner(self) -> T {
67        self.inner
68    }
69}
70
71impl<T: Read + Write + Seek> Read for StreamSlice<T> {
72    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, StreamSliceError<T::Error>> {
73        let max_read_size = cmp::min((self.size - self.current_offset) as usize, buf.len());
74        let bytes_read = self.inner.read(&mut buf[..max_read_size]).await?;
75        self.current_offset += bytes_read as u64;
76        Ok(bytes_read)
77    }
78}
79
80impl<T: Read + Write + Seek> Write for StreamSlice<T> {
81    async fn write(&mut self, buf: &[u8]) -> Result<usize, StreamSliceError<T::Error>> {
82        let max_write_size = cmp::min((self.size - self.current_offset) as usize, buf.len());
83        let bytes_written = self.inner.write(&buf[..max_write_size]).await?;
84        if bytes_written == 0 {
85            return Err(StreamSliceError::WriteZero);
86        }
87        self.current_offset += bytes_written as u64;
88        Ok(bytes_written)
89    }
90
91    async fn flush(&mut self) -> Result<(), StreamSliceError<T::Error>> {
92        self.inner.flush().await?;
93        Ok(())
94    }
95}
96
97impl<T: Read + Write + Seek> Seek for StreamSlice<T> {
98    async fn seek(&mut self, pos: SeekFrom) -> Result<u64, StreamSliceError<T::Error>> {
99        let new_offset = match pos {
100            SeekFrom::Current(x) => self.current_offset as i64 + x,
101            SeekFrom::Start(x) => x as i64,
102            SeekFrom::End(x) => self.size as i64 + x,
103        };
104        if new_offset < 0 || new_offset as u64 > self.size {
105            Err(StreamSliceError::InvalidSeek(new_offset))
106        } else {
107            self.inner
108                .seek(SeekFrom::Start(self.start_offset + new_offset as u64))
109                .await?;
110            self.current_offset = new_offset as u64;
111            Ok(self.current_offset)
112        }
113    }
114}
115
116#[cfg(test)]
117mod test {
118    use super::*;
119
120    #[tokio::test]
121    async fn stream_test() {
122        let _ = env_logger::builder().is_test(true).try_init();
123        let buf = "BeforeTest dataAfter".to_string().into_bytes();
124        let cur = std::io::Cursor::new(buf);
125        let mut stream =
126            StreamSlice::new(embedded_io_adapters::tokio_1::FromTokio::new(cur), 6, 6 + 9)
127                .await
128                .unwrap();
129
130        let data = read_to_string(&mut stream).await.unwrap();
131        assert_eq!(data, "Test data");
132
133        stream.seek(SeekFrom::Start(5)).await.unwrap();
134        let data = read_to_string(&mut stream).await.unwrap();
135        assert_eq!(data, "data");
136
137        stream.seek(SeekFrom::Start(5)).await.unwrap();
138        stream.write_all("Rust".as_bytes()).await.unwrap();
139        assert!(stream.write_all("X".as_bytes()).await.is_err());
140        stream.seek(SeekFrom::Start(0)).await.unwrap();
141        let data = read_to_string(&mut stream).await.unwrap();
142        assert_eq!(data, "Test Rust");
143    }
144
145    async fn read_to_string<IO: embedded_io_async::Read>(io: &mut IO) -> Result<String, IO::Error> {
146        let mut buf = Vec::new();
147        loop {
148            let mut tmp = [0; 256];
149            match io.read(&mut tmp).await {
150                Ok(0) => break,
151                Ok(n) => buf.extend(&tmp[..n]),
152                Err(e) => return Err(e),
153            }
154        }
155
156        Ok(String::from_utf8(buf).unwrap())
157    }
158}