fluvio_future/
zero_copy.rs

1use std::io::Error as IoError;
2use std::os::fd::BorrowedFd;
3use std::os::unix::io::{AsRawFd, RawFd};
4use std::thread::sleep;
5use thiserror::Error;
6
7#[allow(unused)]
8use nix::libc::off_t;
9use nix::sys::sendfile::sendfile;
10use nix::Error as NixError;
11
12use tracing::{debug, error, trace};
13
14use crate::task::spawn_blocking;
15
16use crate::file_slice::AsyncFileSlice;
17
18#[derive(Error, Debug)]
19pub enum SendFileError {
20    #[error("IO error: {source}")]
21    IoError {
22        #[from]
23        source: IoError,
24    },
25    #[error("Nix error: {source}")]
26    NixError {
27        #[from]
28        source: NixError,
29    },
30}
31
32/// zero copy write
33pub struct ZeroCopy(RawFd);
34
35impl ZeroCopy {
36    pub fn from<S>(fd: &mut S) -> Self
37    where
38        S: AsRawFd,
39    {
40        Self(fd.as_raw_fd())
41    }
42
43    pub fn raw(fd: RawFd) -> Self {
44        Self(fd)
45    }
46}
47
48impl ZeroCopy {
49    pub async fn copy_slice(&self, source: &AsyncFileSlice) -> Result<usize, SendFileError> {
50        let size = source.len();
51        let target_raw_fd = self.0;
52        let source_raw_fd = source.fd();
53
54        #[cfg(target_os = "linux")]
55        let ft = {
56            let offset = source.position() as off_t;
57
58            spawn_blocking(move || {
59                let mut total_transferred: usize = 0; // total bytes transferred so far
60                let mut current_offset = offset;
61
62                let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
63                let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
64
65                loop {
66                    let to_be_transfer = size as usize - total_transferred;
67
68                    trace!(
69                        "trying: zero copy source fd: {} offset: {} len: {}, target fd: {}",
70                        source_raw_fd,
71                        current_offset,
72                        to_be_transfer,
73                        target_raw_fd
74                    );
75
76                    match sendfile(
77                        target_fd,
78                        source_fd,
79                        Some(&mut current_offset),
80                        to_be_transfer,
81                    ) {
82                        Ok(bytes_transferred) => {
83                            trace!("bytes transferred: {}", bytes_transferred);
84                            total_transferred += bytes_transferred;
85
86                            // zero bytes transferred means it's EOF
87                            if bytes_transferred == 0 {
88                                return Ok(total_transferred);
89                            }
90
91                            if total_transferred < size as usize {
92                                debug!(
93                                    "current transferred: {} less than total: {}, continuing",
94                                    total_transferred, size
95                                );
96                            } else {
97                                trace!(
98                                    "actual: zero copy bytes transferred: {} out of {}, ",
99                                    bytes_transferred,
100                                    size
101                                );
102
103                                return Ok(total_transferred);
104                            }
105                        }
106                        Err(err) => match err {
107                            nix::errno::Errno::EAGAIN => {
108                                debug!(
109                                    "EAGAIN, continuing source: {},target: {}",
110                                    source_raw_fd, target_raw_fd
111                                );
112                                sleep(std::time::Duration::from_millis(10));
113                            }
114                            _ => {
115                                error!("error sendfile: {}", err);
116                                return Err(err.into());
117                            }
118                        },
119                    }
120                }
121            })
122        };
123
124        #[cfg(target_os = "macos")]
125        let ft = {
126            let offset = source.position();
127            spawn_blocking(move || {
128                use nix::errno::Errno;
129
130                let mut total_transferred = 0;
131                let mut current_offset = offset;
132
133                let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
134                let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
135
136                loop {
137                    let to_be_transfer = (size - total_transferred) as i64;
138
139                    trace!(
140                        "mac zero copy source fd: {} offset: {} len: {}, target: fd{}",
141                        source_raw_fd,
142                        current_offset,
143                        to_be_transfer,
144                        target_raw_fd
145                    );
146
147                    let (res, bytes_transferred) = sendfile(
148                        source_fd,
149                        target_fd,
150                        current_offset as i64,
151                        Some(to_be_transfer),
152                        None,
153                        None,
154                    );
155
156                    trace!("mac zero copy bytes transferred: {}", bytes_transferred);
157
158                    total_transferred += bytes_transferred as u64;
159                    current_offset += bytes_transferred as u64;
160                    match res {
161                        Ok(_) => {
162                            // zero bytes transferred means it's EOF
163                            if bytes_transferred == 0 {
164                                trace!("no more bytes transferred");
165                                return Ok(total_transferred as usize);
166                            }
167                            if total_transferred < size {
168                                debug!(
169                                    "current transferred: {} less than total: {}, continuing",
170                                    total_transferred, size
171                                );
172                            } else {
173                                return Ok(total_transferred as usize);
174                            }
175                        }
176                        Err(err) => {
177                            if err == Errno::EAGAIN {
178                                debug!("EAGAIN, try again");
179                                sleep(std::time::Duration::from_millis(10));
180                            } else {
181                                error!("error sendfile: {}", err);
182                                return Err(err.into());
183                            }
184                        }
185                    }
186                }
187            })
188        };
189
190        ft.await
191    }
192}
193
194#[cfg(test)]
195mod tests {
196
197    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
198    use std::time;
199
200    use futures_lite::future::zip;
201    use futures_util::stream::StreamExt;
202    use tracing::debug;
203
204    use crate::file_slice::AsyncFileSlice;
205    use crate::fs::AsyncFileExtension;
206    use crate::net::tcp_stream::stream;
207    use crate::net::TcpListener;
208    use crate::timer::sleep;
209    use crate::{fs::util as file_util, zero_copy::ZeroCopy};
210    use futures_lite::AsyncReadExt;
211
212    use super::SendFileError;
213
214    #[fluvio_future::test]
215    async fn test_zero_copy_simple() {
216        let port = portpicker::pick_unused_port().expect("No free ports left");
217        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
218
219        // spawn tcp client and check contents
220        let server = async {
221            #[allow(unused_mut)]
222            let mut listener = TcpListener::bind(addr).await?;
223
224            debug!("server: listening");
225            let mut incoming = listener.incoming();
226            if let Some(stream) = incoming.next().await {
227                debug!("server: got connection. waiting");
228                let mut tcp_stream = stream?;
229                let mut buf = [0; 30];
230                let len = tcp_stream.read(&mut buf).await?;
231                assert_eq!(len, 30);
232            } else {
233                panic!("client should connect");
234            }
235            Ok(()) as Result<(), SendFileError>
236        };
237
238        let client = async {
239            let file = file_util::open("test-data/apirequest.bin").await?;
240            sleep(time::Duration::from_millis(100)).await;
241
242            debug!("client: file loaded");
243            let mut stream = stream(&addr).await?;
244            debug!("client: connected to server");
245            let f_slice = file.as_slice(0, None).await?;
246            debug!("client: send back file using zero copy");
247            let writer = ZeroCopy::from(&mut stream);
248            let transfered = writer.copy_slice(&f_slice).await?;
249            assert_eq!(transfered, 30);
250            Ok(()) as Result<(), SendFileError>
251        };
252
253        // read file and zero copy to tcp stream
254
255        let _ = zip(client, server).await;
256    }
257
258    #[fluvio_future::test]
259    async fn test_zero_copy_large_size() {
260        const MAX_BYTES: usize = 3000000;
261
262        use futures_lite::AsyncWriteExt;
263        use std::env::temp_dir;
264
265        const TEST_ITERATION: u16 = 2;
266
267        let port = portpicker::pick_unused_port().expect("No free ports left");
268        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
269
270        async fn init_file() {
271            let temp_file = temp_dir().join("async_large");
272            debug!("temp file: {:#?}", temp_file);
273            let mut file = file_util::create(temp_file.clone())
274                .await
275                .expect("file creation");
276
277            let bytes: Vec<u8> = vec![0; 1000];
278            for _ in 0..3000 {
279                file.write_all(&bytes).await.expect("writing");
280            }
281
282            file.sync_all().await.expect("flushing");
283            drop(file);
284            debug!("finish creating large test file");
285        }
286
287        // spawn tcp client and check contents
288        let server = async {
289            let temp_file = temp_dir().join("async_large");
290            // do zero copy
291            let file = file_util::open(&temp_file).await.expect("re opening");
292            let f_slice = file.as_slice(0, None).await.expect("filed opening");
293            assert_eq!(f_slice.len(), MAX_BYTES as u64);
294
295            let listener = TcpListener::bind(addr).await.expect("failed bind");
296
297            debug!("server: listening");
298            let mut incoming = listener.incoming();
299
300            for i in 0..TEST_ITERATION {
301                let stream = incoming.next().await.expect("client should connect");
302                debug!("server {} got connection. waiting", i);
303
304                let mut tcp_stream = stream.expect("stream");
305
306                debug!(
307                    "server {}, send back file using zero copy: {:#?}",
308                    i,
309                    f_slice.len()
310                );
311
312                let writer = ZeroCopy::from(&mut tcp_stream);
313
314                if i == 0 {
315                    let transferred = writer.copy_slice(&f_slice).await.expect("file slice");
316                    assert_eq!(transferred, MAX_BYTES);
317                } else {
318                    let slice2 = AsyncFileSlice::new(f_slice.fd(), 0, (MAX_BYTES * 2) as u64);
319                    let transferred = writer.copy_slice(&slice2).await.expect("file slice");
320                    assert_eq!(transferred, MAX_BYTES);
321                }
322            }
323        };
324
325        let client = async {
326            // wait 1 seconds to give server to be ready
327            sleep(time::Duration::from_millis(100)).await;
328            debug!("client loop starting");
329
330            for i in 0..TEST_ITERATION {
331                debug!("client: Test loop: {}", i);
332                let mut stream = stream(&addr).await?;
333                debug!("client: {} connected trying to read", i);
334                // let server send response
335
336                let mut buffer = Vec::with_capacity(MAX_BYTES);
337                stream
338                    .read_to_end(&mut buffer)
339                    .await
340                    .expect("no more buffer");
341                debug!("client: {} test success", i);
342
343                // sleep 10 milliseconds between request to keep tcp connection otherwise it may lead to EPIPE error
344                //
345                sleep(time::Duration::from_millis(10)).await;
346            }
347
348            Ok(()) as Result<(), SendFileError>
349        };
350
351        init_file().await;
352
353        // read file and zero copy to tcp stream
354        let _ = zip(client, server).await;
355    }
356
357    /// test zero copy when len is too large
358    #[fluvio_future::test]
359    async fn test_zero_copy_large_slace() {
360        let port = portpicker::pick_unused_port().expect("No free ports left");
361        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
362
363        // spawn tcp client and check contents
364        let server = async {
365            #[allow(unused_mut)]
366            let mut listener = TcpListener::bind(addr).await?;
367
368            debug!("server: listening");
369            let mut incoming = listener.incoming();
370            if let Some(stream) = incoming.next().await {
371                debug!("server: got connection. waiting");
372                let mut tcp_stream = stream?;
373                let mut buf = [0; 30];
374                let len = tcp_stream.read(&mut buf).await?;
375                assert_eq!(len, 30);
376            } else {
377                panic!("client should connect");
378            }
379            Ok(()) as Result<(), SendFileError>
380        };
381
382        let client = async {
383            let file = file_util::open("test-data/apirequest.bin").await?;
384            sleep(time::Duration::from_millis(100)).await;
385
386            debug!("client: file loaded");
387            let mut stream = stream(&addr).await?;
388            debug!("client: connected to server");
389            let f_slice = file.as_slice(0, None).await?;
390            let max_slice = AsyncFileSlice::new(f_slice.fd(), 0, 1000);
391            debug!("slice: {:#?}", max_slice);
392            debug!("client: send back file using zero copy");
393            let writer = ZeroCopy::from(&mut stream);
394            let transfer = writer.copy_slice(&max_slice).await?;
395            assert_eq!(transfer, 30);
396            Ok(()) as Result<(), SendFileError>
397        };
398
399        // read file and zero copy to tcp stream
400
401        let _ = zip(client, server).await;
402    }
403}