block_device_adapters/
stream_slice.rs1use core::cmp;
2use core::fmt::Debug;
3use embedded_io_async::{Read, Seek, SeekFrom, Write};
4
5#[cfg_attr(feature = "defmt", derive(defmt::Format))]
6#[derive(Debug)]
7#[non_exhaustive]
8pub enum StreamSliceError<T: Debug> {
9 InvalidSeek(i64),
10 WriteZero,
11 Other(T),
12}
13
14impl<E: Debug> From<E> for StreamSliceError<E> {
15 fn from(e: E) -> Self {
16 Self::Other(e)
17 }
18}
19
20pub struct StreamSlice<T: Read + Write + Seek> {
22 inner: T,
23 start_offset: u64,
24 current_offset: u64,
25 size: u64,
26}
27
28impl<E: Debug> embedded_io_async::Error for StreamSliceError<E> {
29 fn kind(&self) -> embedded_io_async::ErrorKind {
30 match self {
31 StreamSliceError::InvalidSeek(_) => embedded_io_async::ErrorKind::InvalidInput,
32 StreamSliceError::Other(_) | StreamSliceError::WriteZero => {
33 embedded_io_async::ErrorKind::Other
34 }
35 }
36 }
37}
38
39impl<T: Read + Write + Seek> embedded_io_async::ErrorType for StreamSlice<T> {
40 type Error = StreamSliceError<T::Error>;
41}
42
43impl<T: Read + Write + Seek> StreamSlice<T> {
44 pub async fn new(
50 mut inner: T,
51 start_offset: u64,
52 end_offset: u64,
53 ) -> Result<Self, StreamSliceError<T::Error>> {
54 debug_assert!(end_offset >= start_offset);
55 inner.seek(SeekFrom::Start(start_offset)).await?;
56 let size = end_offset - start_offset;
57 Ok(StreamSlice {
58 start_offset,
59 size,
60 inner,
61 current_offset: 0,
62 })
63 }
64
65 pub fn into_inner(self) -> T {
67 self.inner
68 }
69}
70
71impl<T: Read + Write + Seek> Read for StreamSlice<T> {
72 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, StreamSliceError<T::Error>> {
73 let max_read_size = cmp::min((self.size - self.current_offset) as usize, buf.len());
74 let bytes_read = self.inner.read(&mut buf[..max_read_size]).await?;
75 self.current_offset += bytes_read as u64;
76 Ok(bytes_read)
77 }
78}
79
80impl<T: Read + Write + Seek> Write for StreamSlice<T> {
81 async fn write(&mut self, buf: &[u8]) -> Result<usize, StreamSliceError<T::Error>> {
82 let max_write_size = cmp::min((self.size - self.current_offset) as usize, buf.len());
83 let bytes_written = self.inner.write(&buf[..max_write_size]).await?;
84 if bytes_written == 0 {
85 return Err(StreamSliceError::WriteZero);
86 }
87 self.current_offset += bytes_written as u64;
88 Ok(bytes_written)
89 }
90
91 async fn flush(&mut self) -> Result<(), StreamSliceError<T::Error>> {
92 self.inner.flush().await?;
93 Ok(())
94 }
95}
96
97impl<T: Read + Write + Seek> Seek for StreamSlice<T> {
98 async fn seek(&mut self, pos: SeekFrom) -> Result<u64, StreamSliceError<T::Error>> {
99 let new_offset = match pos {
100 SeekFrom::Current(x) => self.current_offset as i64 + x,
101 SeekFrom::Start(x) => x as i64,
102 SeekFrom::End(x) => self.size as i64 + x,
103 };
104 if new_offset < 0 || new_offset as u64 > self.size {
105 Err(StreamSliceError::InvalidSeek(new_offset))
106 } else {
107 self.inner
108 .seek(SeekFrom::Start(self.start_offset + new_offset as u64))
109 .await?;
110 self.current_offset = new_offset as u64;
111 Ok(self.current_offset)
112 }
113 }
114}
115
116#[cfg(test)]
117mod test {
118 use super::*;
119
120 #[tokio::test]
121 async fn stream_test() {
122 let _ = env_logger::builder().is_test(true).try_init();
123 let buf = "BeforeTest dataAfter".to_string().into_bytes();
124 let cur = std::io::Cursor::new(buf);
125 let mut stream =
126 StreamSlice::new(embedded_io_adapters::tokio_1::FromTokio::new(cur), 6, 6 + 9)
127 .await
128 .unwrap();
129
130 let data = read_to_string(&mut stream).await.unwrap();
131 assert_eq!(data, "Test data");
132
133 stream.seek(SeekFrom::Start(5)).await.unwrap();
134 let data = read_to_string(&mut stream).await.unwrap();
135 assert_eq!(data, "data");
136
137 stream.seek(SeekFrom::Start(5)).await.unwrap();
138 stream.write_all("Rust".as_bytes()).await.unwrap();
139 assert!(stream.write_all("X".as_bytes()).await.is_err());
140 stream.seek(SeekFrom::Start(0)).await.unwrap();
141 let data = read_to_string(&mut stream).await.unwrap();
142 assert_eq!(data, "Test Rust");
143 }
144
145 async fn read_to_string<IO: embedded_io_async::Read>(io: &mut IO) -> Result<String, IO::Error> {
146 let mut buf = Vec::new();
147 loop {
148 let mut tmp = [0; 256];
149 match io.read(&mut tmp).await {
150 Ok(0) => break,
151 Ok(n) => buf.extend(&tmp[..n]),
152 Err(e) => return Err(e),
153 }
154 }
155
156 Ok(String::from_utf8(buf).unwrap())
157 }
158}