copy_double_buffered/
lib.rs1#![cfg_attr(not(any(feature = "std", test)), no_std)]
2
3use embassy_futures::join::join;
4
5pub async fn copy_double_buffered<'a, E: Sized>(
11 mut read: impl AsyncFnMut(&mut [u8]) -> Result<usize, E>,
12 mut write: impl AsyncFnMut(&[u8]) -> Result<(), E>,
13 mut buf_a: &'a mut [u8],
14 mut buf_b: &'a mut [u8],
15) -> Result<(), E> {
16 let mut read_a: usize = read(buf_a).await?;
17 let mut read_b = 0usize;
18 loop {
19 match (&mut read_a, &mut read_b) {
20 (read_a, 0) if *read_a > 0 => {
21 let res = join(read(&mut buf_b), write(&buf_a[..*read_a])).await;
22 *read_a = 0;
23 match res {
24 (Ok(read), res) => {
25 read_b = read;
26 res?;
27 }
28 (res, _) => {
29 res?;
30 }
31 }
32 }
33 (0, read_b) if *read_b > 0 => {
34 let res = join(read(&mut buf_a), write(&buf_b[..*read_b])).await;
35 *read_b = 0;
36 match res {
37 (Ok(read), res) => {
38 read_a = read;
39 res?;
40 }
41 (res, _) => {
42 res?;
43 }
44 }
45 }
46 (0, 0) => {
47 break Ok(());
48 }
49 (read_a, read_b) => {
50 write(&buf_a[..*read_a]).await?;
51 write(&buf_b[..*read_b]).await?;
52 }
53 }
54 }
55}
56
57#[cfg(feature = "embedded-io-async")]
58pub mod eia {
59
60 pub async fn copy_double_buffered<'a, R, W, E>(
78 mut src: R,
79 mut dst: W,
80 buf_a: &'a mut [u8],
81 buf_b: &'a mut [u8],
82 ) -> Result<(), E>
83 where
84 R: embedded_io_async::Read<Error = E>,
85 W: embedded_io_async::Write<Error = E>,
86 {
87 crate::copy_double_buffered(
88 async move |buf| src.read(buf).await,
89 async move |buf| dst.write_all(buf).await,
90 buf_a,
91 buf_b,
92 )
93 .await
94 }
95}
96
97#[cfg(test)]
98mod tests {
99 use core::time::Duration;
100 use std::time::Instant;
101
102 use tokio::time::sleep;
103
104 use super::*;
105
106 #[tokio::test]
107 async fn copy_delayed() {
108 let mut src = [0u8; 1024 * 4];
109 src.iter_mut()
110 .enumerate()
111 .for_each(|(i, v)| *v = (i % 255) as u8);
112 let mut dst: Vec<u8> = Vec::new();
113 let [mut buf_a, mut buf_b] = [[0u8; 64]; 2];
114 const DELAY: u64 = 100;
115 let begin = Instant::now();
116 crate::copy_double_buffered(
117 {
118 let mut src = &src[..];
119 async move |buf| {
120 let read = core::cmp::min(buf.len(), src.len());
121 buf[..read].copy_from_slice(&src[..read]);
122 sleep(Duration::from_millis(DELAY)).await;
123 src = &src[read..];
124 Ok::<usize, ()>(read)
125 }
126 },
127 async |buf| {
128 dst.extend_from_slice(buf);
129 sleep(Duration::from_millis(DELAY)).await;
130 Ok::<(), ()>(())
131 },
132 &mut buf_a[..],
133 &mut buf_b[..],
134 )
135 .await
136 .unwrap();
137 assert_eq!(&src[..], &dst[..]);
138 dst.clear();
139 let duration = Instant::now() - begin;
140 let naive_begin = Instant::now();
141 let mut buf = [0u8; 16];
142 {
143 let mut src = &src[..];
144 loop {
145 let read = core::cmp::min(buf.len(), src.len());
146 if read == 0 {
147 break;
148 }
149 buf[..read].copy_from_slice(&src[..read]);
150 sleep(Duration::from_millis(DELAY)).await;
151 src = &src[read..];
152 dst.extend_from_slice(&buf[..]);
153 sleep(Duration::from_millis(DELAY)).await;
154 }
155 }
156 let native_duration = Instant::now() - naive_begin;
157 dbg!((duration, native_duration));
158 assert!(duration * 2 < native_duration * 3);
159 }
160}