copy_double_buffered/
lib.rs

1#![cfg_attr(not(any(feature = "std", test)), no_std)]
2
3use embassy_futures::join::join;
4
5/// Copies from `read` to `write` in parallel
6/// The first chunk is read sequentially
7/// and then written to `write` while the next
8/// chunk is read concurrently and so furth
9/// until `read` returns 0
10pub async fn copy_double_buffered<'a, E: Sized>(
11    mut read: impl AsyncFnMut(&mut [u8]) -> Result<usize, E>,
12    mut write: impl AsyncFnMut(&[u8]) -> Result<(), E>,
13    mut buf_a: &'a mut [u8],
14    mut buf_b: &'a mut [u8],
15) -> Result<(), E> {
16    let mut read_a: usize = read(buf_a).await?;
17    let mut read_b = 0usize;
18    loop {
19        match (&mut read_a, &mut read_b) {
20            (read_a, 0) if *read_a > 0 => {
21                let res = join(read(&mut buf_b), write(&buf_a[..*read_a])).await;
22                *read_a = 0;
23                match res {
24                    (Ok(read), res) => {
25                        read_b = read;
26                        res?;
27                    }
28                    (res, _) => {
29                        res?;
30                    }
31                }
32            }
33            (0, read_b) if *read_b > 0 => {
34                let res = join(read(&mut buf_a), write(&buf_b[..*read_b])).await;
35                *read_b = 0;
36                match res {
37                    (Ok(read), res) => {
38                        read_a = read;
39                        res?;
40                    }
41                    (res, _) => {
42                        res?;
43                    }
44                }
45            }
46            (0, 0) => {
47                break Ok(());
48            }
49            (read_a, read_b) => {
50                write(&buf_a[..*read_a]).await?;
51                write(&buf_b[..*read_b]).await?;
52            }
53        }
54    }
55}
56
57#[cfg(feature = "embedded-io-async")]
58pub mod eia {
59
60    /// Copies from `src` to `dst` in parallel
61    /// The first chunk is read sequentially
62    /// and then written to `write` while the next
63    /// chunk is read concurrently
64    /// ```rust
65    /// let mut src = [0u8; 1024 * 4];
66    /// // Generate some data
67    /// src.iter_mut()
68    ///     .enumerate()
69    ///     .for_each(|(i, v)| *v = (i % 255) as u8);
70    /// let mut dst: Vec<u8> = Vec::new();
71    /// let [mut buf_a, mut buf_b] = [[0u8; 16]; 2];
72    /// eia::copy_double_buffered(&src[..], &mut dst, &mut buf_a[..], &mut buf_b[..])
73    ///     .await
74    ///     .unwrap();
75    /// assert_eq!(&src[..], &dst[..]);
76    /// ```
77    pub async fn copy_double_buffered<'a, R, W, E>(
78        mut src: R,
79        mut dst: W,
80        buf_a: &'a mut [u8],
81        buf_b: &'a mut [u8],
82    ) -> Result<(), E>
83    where
84        R: embedded_io_async::Read<Error = E>,
85        W: embedded_io_async::Write<Error = E>,
86    {
87        crate::copy_double_buffered(
88            async move |buf| src.read(buf).await,
89            async move |buf| dst.write_all(buf).await,
90            buf_a,
91            buf_b,
92        )
93        .await
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use core::time::Duration;
100    use std::time::Instant;
101
102    use tokio::time::sleep;
103
104    use super::*;
105
106    #[tokio::test]
107    async fn copy_delayed() {
108        let mut src = [0u8; 1024 * 4];
109        src.iter_mut()
110            .enumerate()
111            .for_each(|(i, v)| *v = (i % 255) as u8);
112        let mut dst: Vec<u8> = Vec::new();
113        let [mut buf_a, mut buf_b] = [[0u8; 64]; 2];
114        const DELAY: u64 = 100;
115        let begin = Instant::now();
116        crate::copy_double_buffered(
117            {
118                let mut src = &src[..];
119                async move |buf| {
120                    let read = core::cmp::min(buf.len(), src.len());
121                    buf[..read].copy_from_slice(&src[..read]);
122                    sleep(Duration::from_millis(DELAY)).await;
123                    src = &src[read..];
124                    Ok::<usize, ()>(read)
125                }
126            },
127            async |buf| {
128                dst.extend_from_slice(buf);
129                sleep(Duration::from_millis(DELAY)).await;
130                Ok::<(), ()>(())
131            },
132            &mut buf_a[..],
133            &mut buf_b[..],
134        )
135        .await
136        .unwrap();
137        assert_eq!(&src[..], &dst[..]);
138        dst.clear();
139        let duration = Instant::now() - begin;
140        let naive_begin = Instant::now();
141        let mut buf = [0u8; 16];
142        {
143            let mut src = &src[..];
144            loop {
145                let read = core::cmp::min(buf.len(), src.len());
146                if read == 0 {
147                    break;
148                }
149                buf[..read].copy_from_slice(&src[..read]);
150                sleep(Duration::from_millis(DELAY)).await;
151                src = &src[read..];
152                dst.extend_from_slice(&buf[..]);
153                sleep(Duration::from_millis(DELAY)).await;
154            }
155        }
156        let native_duration = Instant::now() - naive_begin;
157        dbg!((duration, native_duration));
158        assert!(duration * 2 < native_duration * 3);
159    }
160}