fluvio_future/
zero_copy.rs

1use std::io::Error as IoError;
2use std::os::fd::BorrowedFd;
3use std::os::unix::io::{AsRawFd, RawFd};
4use std::thread::sleep;
5use thiserror::Error;
6
7use nix::Error as NixError;
8#[allow(unused)]
9use nix::libc::off_t;
10use nix::sys::sendfile::sendfile;
11
12use tracing::{debug, error, trace};
13
14use crate::task::spawn_blocking;
15
16use crate::file_slice::AsyncFileSlice;
17
18#[derive(Error, Debug)]
19pub enum SendFileError {
20    #[error("IO error: {source}")]
21    IoError {
22        #[from]
23        source: IoError,
24    },
25    #[error("Nix error: {source}")]
26    NixError {
27        #[from]
28        source: NixError,
29    },
30}
31
32/// zero copy write
33pub struct ZeroCopy(RawFd);
34
35impl ZeroCopy {
36    pub fn from<S>(fd: &mut S) -> Self
37    where
38        S: AsRawFd,
39    {
40        Self(fd.as_raw_fd())
41    }
42
43    pub fn raw(fd: RawFd) -> Self {
44        Self(fd)
45    }
46}
47
48impl ZeroCopy {
49    pub async fn copy_slice(&self, source: &AsyncFileSlice) -> Result<usize, SendFileError> {
50        let size = source.len();
51        let target_raw_fd = self.0;
52        let source_raw_fd = source.fd();
53
54        #[cfg(target_os = "linux")]
55        let ft = {
56            let offset = source.position() as off_t;
57
58            spawn_blocking(move || {
59                let mut total_transferred: usize = 0; // total bytes transferred so far
60                let mut current_offset = offset;
61
62                let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
63                let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
64
65                loop {
66                    let to_be_transfer = size as usize - total_transferred;
67
68                    trace!(
69                        "trying: zero copy source fd: {} offset: {} len: {}, target fd: {}",
70                        source_raw_fd, current_offset, to_be_transfer, target_raw_fd
71                    );
72
73                    match sendfile(
74                        target_fd,
75                        source_fd,
76                        Some(&mut current_offset),
77                        to_be_transfer,
78                    ) {
79                        Ok(bytes_transferred) => {
80                            trace!("bytes transferred: {}", bytes_transferred);
81                            total_transferred += bytes_transferred;
82
83                            // zero bytes transferred means it's EOF
84                            if bytes_transferred == 0 {
85                                return Ok(total_transferred);
86                            }
87
88                            if total_transferred < size as usize {
89                                debug!(
90                                    "current transferred: {} less than total: {}, continuing",
91                                    total_transferred, size
92                                );
93                            } else {
94                                trace!(
95                                    "actual: zero copy bytes transferred: {} out of {}, ",
96                                    bytes_transferred, size
97                                );
98
99                                return Ok(total_transferred);
100                            }
101                        }
102                        Err(err) => match err {
103                            nix::errno::Errno::EAGAIN => {
104                                debug!(
105                                    "EAGAIN, continuing source: {},target: {}",
106                                    source_raw_fd, target_raw_fd
107                                );
108                                sleep(std::time::Duration::from_millis(10));
109                            }
110                            _ => {
111                                error!("error sendfile: {}", err);
112                                return Err(err.into());
113                            }
114                        },
115                    }
116                }
117            })
118        };
119
120        #[cfg(target_os = "macos")]
121        let ft = {
122            let offset = source.position();
123            spawn_blocking(move || {
124                use nix::errno::Errno;
125
126                let mut total_transferred = 0;
127                let mut current_offset = offset;
128
129                let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
130                let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
131
132                loop {
133                    let to_be_transfer = (size - total_transferred) as i64;
134
135                    trace!(
136                        "mac zero copy source fd: {} offset: {} len: {}, target: fd{}",
137                        source_raw_fd, current_offset, to_be_transfer, target_raw_fd
138                    );
139
140                    let (res, bytes_transferred) = sendfile(
141                        source_fd,
142                        target_fd,
143                        current_offset as i64,
144                        Some(to_be_transfer),
145                        None,
146                        None,
147                    );
148
149                    trace!("mac zero copy bytes transferred: {}", bytes_transferred);
150
151                    total_transferred += bytes_transferred as u64;
152                    current_offset += bytes_transferred as u64;
153                    match res {
154                        Ok(_) => {
155                            // zero bytes transferred means it's EOF
156                            if bytes_transferred == 0 {
157                                trace!("no more bytes transferred");
158                                return Ok(total_transferred as usize);
159                            }
160                            if total_transferred < size {
161                                debug!(
162                                    "current transferred: {} less than total: {}, continuing",
163                                    total_transferred, size
164                                );
165                            } else {
166                                return Ok(total_transferred as usize);
167                            }
168                        }
169                        Err(err) => {
170                            if err == Errno::EAGAIN {
171                                debug!("EAGAIN, try again");
172                                sleep(std::time::Duration::from_millis(10));
173                            } else {
174                                error!("error sendfile: {}", err);
175                                return Err(err.into());
176                            }
177                        }
178                    }
179                }
180            })
181        };
182
183        ft.await
184    }
185}
186
187#[cfg(test)]
188mod tests {
189
190    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
191    use std::time;
192
193    use futures_lite::future::zip;
194    use futures_util::stream::StreamExt;
195    use tracing::debug;
196
197    use crate::file_slice::AsyncFileSlice;
198    use crate::fs::AsyncFileExtension;
199    use crate::net::TcpListener;
200    use crate::net::tcp_stream::stream;
201    use crate::timer::sleep;
202    use crate::{fs::util as file_util, zero_copy::ZeroCopy};
203    use futures_lite::AsyncReadExt;
204
205    use super::SendFileError;
206
207    #[fluvio_future::test]
208    async fn test_zero_copy_simple() {
209        let port = portpicker::pick_unused_port().expect("No free ports left");
210        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
211
212        // spawn tcp client and check contents
213        let server = async {
214            #[allow(unused_mut)]
215            let mut listener = TcpListener::bind(addr).await?;
216
217            debug!("server: listening");
218            let mut incoming = listener.incoming();
219            if let Some(stream) = incoming.next().await {
220                debug!("server: got connection. waiting");
221                let mut tcp_stream = stream?;
222                let mut buf = [0; 30];
223                let len = tcp_stream.read(&mut buf).await?;
224                assert_eq!(len, 30);
225            } else {
226                panic!("client should connect");
227            }
228            Ok(()) as Result<(), SendFileError>
229        };
230
231        let client = async {
232            let file = file_util::open("test-data/apirequest.bin").await?;
233            sleep(time::Duration::from_millis(100)).await;
234
235            debug!("client: file loaded");
236            let mut stream = stream(&addr).await?;
237            debug!("client: connected to server");
238            let f_slice = file.as_slice(0, None).await?;
239            debug!("client: send back file using zero copy");
240            let writer = ZeroCopy::from(&mut stream);
241            let transfered = writer.copy_slice(&f_slice).await?;
242            assert_eq!(transfered, 30);
243            Ok(()) as Result<(), SendFileError>
244        };
245
246        // read file and zero copy to tcp stream
247
248        let _ = zip(client, server).await;
249    }
250
251    #[fluvio_future::test]
252    async fn test_zero_copy_large_size() {
253        const MAX_BYTES: usize = 3000000;
254
255        use futures_lite::AsyncWriteExt;
256        use std::env::temp_dir;
257
258        const TEST_ITERATION: u16 = 2;
259
260        let port = portpicker::pick_unused_port().expect("No free ports left");
261        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
262
263        async fn init_file() {
264            let temp_file = temp_dir().join("async_large");
265            debug!("temp file: {:#?}", temp_file);
266            let mut file = file_util::create(temp_file.clone())
267                .await
268                .expect("file creation");
269
270            let bytes: Vec<u8> = vec![0; 1000];
271            for _ in 0..3000 {
272                file.write_all(&bytes).await.expect("writing");
273            }
274
275            file.sync_all().await.expect("flushing");
276            drop(file);
277            debug!("finish creating large test file");
278        }
279
280        // spawn tcp client and check contents
281        let server = async {
282            let temp_file = temp_dir().join("async_large");
283            // do zero copy
284            let file = file_util::open(&temp_file).await.expect("re opening");
285            let f_slice = file.as_slice(0, None).await.expect("filed opening");
286            assert_eq!(f_slice.len(), MAX_BYTES as u64);
287
288            let listener = TcpListener::bind(addr).await.expect("failed bind");
289
290            debug!("server: listening");
291            let mut incoming = listener.incoming();
292
293            for i in 0..TEST_ITERATION {
294                let stream = incoming.next().await.expect("client should connect");
295                debug!("server {} got connection. waiting", i);
296
297                let mut tcp_stream = stream.expect("stream");
298
299                debug!(
300                    "server {}, send back file using zero copy: {:#?}",
301                    i,
302                    f_slice.len()
303                );
304
305                let writer = ZeroCopy::from(&mut tcp_stream);
306
307                if i == 0 {
308                    let transferred = writer.copy_slice(&f_slice).await.expect("file slice");
309                    assert_eq!(transferred, MAX_BYTES);
310                } else {
311                    let slice2 = AsyncFileSlice::new(f_slice.fd(), 0, (MAX_BYTES * 2) as u64);
312                    let transferred = writer.copy_slice(&slice2).await.expect("file slice");
313                    assert_eq!(transferred, MAX_BYTES);
314                }
315            }
316        };
317
318        let client = async {
319            // wait 1 seconds to give server to be ready
320            sleep(time::Duration::from_millis(100)).await;
321            debug!("client loop starting");
322
323            for i in 0..TEST_ITERATION {
324                debug!("client: Test loop: {}", i);
325                let mut stream = stream(&addr).await?;
326                debug!("client: {} connected trying to read", i);
327                // let server send response
328
329                let mut buffer = Vec::with_capacity(MAX_BYTES);
330                stream
331                    .read_to_end(&mut buffer)
332                    .await
333                    .expect("no more buffer");
334                debug!("client: {} test success", i);
335
336                // sleep 10 milliseconds between request to keep tcp connection otherwise it may lead to EPIPE error
337                //
338                sleep(time::Duration::from_millis(10)).await;
339            }
340
341            Ok(()) as Result<(), SendFileError>
342        };
343
344        init_file().await;
345
346        // read file and zero copy to tcp stream
347        let _ = zip(client, server).await;
348    }
349
350    /// test zero copy when len is too large
351    #[fluvio_future::test]
352    async fn test_zero_copy_large_slace() {
353        let port = portpicker::pick_unused_port().expect("No free ports left");
354        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
355
356        // spawn tcp client and check contents
357        let server = async {
358            #[allow(unused_mut)]
359            let mut listener = TcpListener::bind(addr).await?;
360
361            debug!("server: listening");
362            let mut incoming = listener.incoming();
363            if let Some(stream) = incoming.next().await {
364                debug!("server: got connection. waiting");
365                let mut tcp_stream = stream?;
366                let mut buf = [0; 30];
367                let len = tcp_stream.read(&mut buf).await?;
368                assert_eq!(len, 30);
369            } else {
370                panic!("client should connect");
371            }
372            Ok(()) as Result<(), SendFileError>
373        };
374
375        let client = async {
376            let file = file_util::open("test-data/apirequest.bin").await?;
377            sleep(time::Duration::from_millis(100)).await;
378
379            debug!("client: file loaded");
380            let mut stream = stream(&addr).await?;
381            debug!("client: connected to server");
382            let f_slice = file.as_slice(0, None).await?;
383            let max_slice = AsyncFileSlice::new(f_slice.fd(), 0, 1000);
384            debug!("slice: {:#?}", max_slice);
385            debug!("client: send back file using zero copy");
386            let writer = ZeroCopy::from(&mut stream);
387            let transfer = writer.copy_slice(&max_slice).await?;
388            assert_eq!(transfer, 30);
389            Ok(()) as Result<(), SendFileError>
390        };
391
392        // read file and zero copy to tcp stream
393
394        let _ = zip(client, server).await;
395    }
396}