rdif_base/
io.rs

1use core::{fmt::Display, future, task::Poll};
2
3use alloc::boxed::Box;
4pub use async_trait::async_trait;
5
6#[async_trait]
7pub trait Read {
8    /// Read data from the device.
9    fn read(&mut self, buf: &mut [u8]) -> Result;
10
11    /// Read data from the device, blocking until all bytes are read
12    fn read_all_blocking(&mut self, buf: &mut [u8]) -> Result {
13        let mut n = 0;
14        while n < buf.len() {
15            let tmp = &mut buf[n..];
16            if let Err(mut e) = self.read(tmp) {
17                n += e.success_pos;
18                if matches!(e.kind, ErrorKind::Interrupted) {
19                    continue;
20                } else {
21                    e.success_pos = n;
22                    return Err(e);
23                }
24            } else {
25                n += tmp.len();
26            }
27        }
28
29        Ok(())
30    }
31
32    async fn read_all(&mut self, buf: &mut [u8]) -> Result {
33        let mut n = 0;
34        future::poll_fn(move |cx| {
35            let tmp = &mut buf[n..];
36            if let Err(mut e) = self.read(tmp) {
37                n += e.success_pos;
38                if !matches!(e.kind, ErrorKind::Interrupted) {
39                    e.success_pos = n;
40                    return Poll::Ready(Err(e));
41                }
42            } else {
43                n += tmp.len();
44            }
45            if n == buf.len() {
46                Poll::Ready(Ok(()))
47            } else {
48                cx.waker().wake_by_ref();
49                Poll::Pending
50            }
51        })
52        .await
53    }
54}
55
56#[async_trait]
57pub trait Write {
58    /// Write data to the device.
59    fn write(&mut self, buf: &[u8]) -> Result;
60
61    fn write_all_blocking(&mut self, buf: &[u8]) -> Result {
62        let mut n = 0;
63        while n < buf.len() {
64            let tmp = &buf[n..];
65            if let Err(mut e) = self.write(tmp) {
66                n += e.success_pos;
67                if matches!(e.kind, ErrorKind::Interrupted) {
68                    continue;
69                } else {
70                    e.success_pos = n;
71                    return Err(e);
72                }
73            } else {
74                n += tmp.len();
75            }
76        }
77        Ok(())
78    }
79
80    async fn write_all(&mut self, buf: &[u8]) -> Result {
81        let mut n = 0;
82        future::poll_fn(move |cx| {
83            let tmp = &buf[n..];
84            if let Err(mut e) = self.write(tmp) {
85                n += e.success_pos;
86                if !matches!(e.kind, ErrorKind::Interrupted) {
87                    e.success_pos = n;
88                    return Poll::Ready(Err(e));
89                }
90            } else {
91                n += tmp.len();
92            }
93            if n == buf.len() {
94                Poll::Ready(Ok(()))
95            } else {
96                cx.waker().wake_by_ref();
97                Poll::Pending
98            }
99        })
100        .await
101    }
102}
103
104pub type Result<T = ()> = core::result::Result<T, Error>;
105
106/// Io error
107#[derive(Debug)]
108pub struct Error {
109    /// The kind of error
110    pub kind: ErrorKind,
111    /// The position of the valid data
112    pub success_pos: usize,
113}
114
115impl Display for Error {
116    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
117        write!(f, "success pos {}, err:{}", self.success_pos, self.kind)
118    }
119}
120
121impl core::error::Error for Error {}
122
123/// Io error kind
124#[derive(thiserror::Error, Debug)]
125pub enum ErrorKind {
126    #[error("Other error: {0}")]
127    Other(Box<dyn core::error::Error>),
128    #[error("Hardware not available")]
129    NotAvailable,
130    #[error("Broken pipe")]
131    BrokenPipe,
132    #[error("Invalid parameter: {name}")]
133    InvalidParameter { name: &'static str },
134    #[error("Invalid data")]
135    InvalidData,
136    #[error("Timed out")]
137    TimedOut,
138    /// This operation was interrupted.
139    ///
140    /// Interrupted operations can typically be retried.
141    #[error("Interrupted")]
142    Interrupted,
143    /// This operation is unsupported on this platform.
144    ///
145    /// This means that the operation can never succeed.
146    #[error("Unsupported")]
147    Unsupported,
148    /// An operation could not be completed, because it failed
149    /// to allocate enough memory.
150    #[error("Out of memory")]
151    OutOfMemory,
152    /// An attempted write could not write any data.
153    #[error("Write zero")]
154    WriteZero,
155}
156
157#[cfg(test)]
158mod test {
159
160    use super::*;
161
162    struct TRead;
163
164    #[async_trait]
165    impl Read for TRead {
166        fn read(&mut self, buf: &mut [u8]) -> Result {
167            const MAX: usize = 2;
168            if !buf.is_empty() {
169                buf[0] = 1;
170            }
171            if buf.len() > 1 {
172                buf[1] = 1;
173            }
174            if buf.len() > MAX {
175                return Err(Error {
176                    kind: ErrorKind::Interrupted,
177                    success_pos: MAX,
178                });
179            }
180            Ok(())
181        }
182    }
183
184    struct ARead<'a, 'b> {
185        n: usize,
186        buf: &'a mut [u8],
187        p: &'b mut TRead2,
188    }
189
190    impl Future for ARead<'_, '_> {
191        type Output = Result;
192
193        fn poll(
194            mut self: core::pin::Pin<&mut Self>,
195            cx: &mut core::task::Context<'_>,
196        ) -> Poll<Self::Output> {
197            let this = &mut *self;
198            let ARead { n, buf, p } = this;
199
200            let tmp = &mut buf[*n..];
201            if let Err(mut e) = p.read(tmp) {
202                *n += e.success_pos;
203                if !matches!(e.kind, ErrorKind::Interrupted) {
204                    e.success_pos = *n;
205                    return Poll::Ready(Err(e));
206                }
207            } else {
208                *n += tmp.len();
209            }
210            if *n == buf.len() {
211                Poll::Ready(Ok(()))
212            } else {
213                cx.waker().wake_by_ref();
214                Poll::Pending
215            }
216        }
217    }
218
219    struct TRead2;
220
221    impl Read for TRead2 {
222        #[doc = " Read data from the device."]
223        fn read(&mut self, buf: &mut [u8]) -> Result {
224            const MAX: usize = 2;
225            if !buf.is_empty() {
226                buf[0] = 1;
227            }
228            if buf.len() > 1 {
229                buf[1] = 1;
230            }
231            if buf.len() > MAX {
232                return Err(Error {
233                    kind: ErrorKind::Interrupted,
234                    success_pos: MAX,
235                });
236            }
237            Ok(())
238        }
239
240        fn read_all<'life0, 'life1, 'async_trait>(
241            &'life0 mut self,
242            buf: &'life1 mut [u8],
243        ) -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = Result> + 'async_trait + Send>>
244        where
245            'life0: 'async_trait,
246            'life1: 'async_trait,
247            Self: 'async_trait,
248        {
249            Box::pin(ARead { n: 0, buf, p: self })
250        }
251    }
252
253    struct TWrite {
254        data: [u8; 8],
255        iter: usize,
256    }
257
258    impl TWrite {
259        fn new() -> Self {
260            Self {
261                data: [0; 8],
262                iter: 0,
263            }
264        }
265
266        fn put(&mut self, data: u8) -> core::result::Result<(), ErrorKind> {
267            if self.iter >= self.data.len() {
268                return Err(ErrorKind::BrokenPipe);
269            }
270            self.data[self.iter] = data;
271            self.iter += 1;
272            Ok(())
273        }
274    }
275
276    impl Write for TWrite {
277        fn write(&mut self, buf: &[u8]) -> Result {
278            const MAX: usize = 2;
279            for (n, i) in (0..MAX.min(buf.len())).enumerate() {
280                self.put(buf[i]).map_err(|e| Error {
281                    kind: e,
282                    success_pos: n,
283                })?;
284            }
285            if buf.len() > MAX {
286                return Err(Error {
287                    kind: ErrorKind::Interrupted,
288                    success_pos: MAX,
289                });
290            }
291
292            Ok(())
293        }
294    }
295
296    #[test]
297    fn test_r() {
298        let mut buf = [0; 8];
299        let mut read = TRead;
300        read.read_all_blocking(&mut buf).unwrap();
301
302        assert_eq!(buf, [1; 8]);
303    }
304
305    #[tokio::test]
306    async fn test_async_r() {
307        let mut buf = [0; 8];
308
309        let buf = tokio::spawn(async move {
310            let mut read = TRead;
311            read.read_all(&mut buf).await.unwrap();
312            buf
313        })
314        .await
315        .unwrap();
316
317        assert_eq!(buf, [1; 8]);
318    }
319
320    #[tokio::test]
321    async fn test_async_r2() {
322        let mut buf = [0; 8];
323
324        let mut read = TRead2;
325        read.read_all(&mut buf).await.unwrap();
326
327        assert_eq!(buf, [1; 8]);
328    }
329
330    #[test]
331    fn test_w() {
332        let buf = [1; 8];
333        let mut w = TWrite::new();
334        w.write_all_blocking(&buf).unwrap();
335
336        assert_eq!(buf, w.data);
337    }
338
339    #[tokio::test]
340    async fn test_async_w() {
341        let buf = [1; 8];
342        let mut w = TWrite::new();
343        w.write_all(&buf).await.unwrap();
344
345        assert_eq!(buf, w.data);
346    }
347}