1use std::io::Error as IoError;
2use std::os::fd::BorrowedFd;
3use std::os::unix::io::{AsRawFd, RawFd};
4use std::thread::sleep;
5use thiserror::Error;
6
7use nix::Error as NixError;
8#[allow(unused)]
9use nix::libc::off_t;
10use nix::sys::sendfile::sendfile;
11
12use tracing::{debug, error, trace};
13
14use crate::task::spawn_blocking;
15
16use crate::file_slice::AsyncFileSlice;
17
18#[derive(Error, Debug)]
19pub enum SendFileError {
20 #[error("IO error: {source}")]
21 IoError {
22 #[from]
23 source: IoError,
24 },
25 #[error("Nix error: {source}")]
26 NixError {
27 #[from]
28 source: NixError,
29 },
30}
31
32pub struct ZeroCopy(RawFd);
34
35impl ZeroCopy {
36 pub fn from<S>(fd: &mut S) -> Self
37 where
38 S: AsRawFd,
39 {
40 Self(fd.as_raw_fd())
41 }
42
43 pub fn raw(fd: RawFd) -> Self {
44 Self(fd)
45 }
46}
47
48impl ZeroCopy {
49 pub async fn copy_slice(&self, source: &AsyncFileSlice) -> Result<usize, SendFileError> {
50 let size = source.len();
51 let target_raw_fd = self.0;
52 let source_raw_fd = source.fd();
53
54 #[cfg(target_os = "linux")]
55 let ft = {
56 let offset = source.position() as off_t;
57
58 spawn_blocking(move || {
59 let mut total_transferred: usize = 0; let mut current_offset = offset;
61
62 let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
63 let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
64
65 loop {
66 let to_be_transfer = size as usize - total_transferred;
67
68 trace!(
69 "trying: zero copy source fd: {} offset: {} len: {}, target fd: {}",
70 source_raw_fd, current_offset, to_be_transfer, target_raw_fd
71 );
72
73 match sendfile(
74 target_fd,
75 source_fd,
76 Some(&mut current_offset),
77 to_be_transfer,
78 ) {
79 Ok(bytes_transferred) => {
80 trace!("bytes transferred: {}", bytes_transferred);
81 total_transferred += bytes_transferred;
82
83 if bytes_transferred == 0 {
85 return Ok(total_transferred);
86 }
87
88 if total_transferred < size as usize {
89 debug!(
90 "current transferred: {} less than total: {}, continuing",
91 total_transferred, size
92 );
93 } else {
94 trace!(
95 "actual: zero copy bytes transferred: {} out of {}, ",
96 bytes_transferred, size
97 );
98
99 return Ok(total_transferred);
100 }
101 }
102 Err(err) => match err {
103 nix::errno::Errno::EAGAIN => {
104 debug!(
105 "EAGAIN, continuing source: {},target: {}",
106 source_raw_fd, target_raw_fd
107 );
108 sleep(std::time::Duration::from_millis(10));
109 }
110 _ => {
111 error!("error sendfile: {}", err);
112 return Err(err.into());
113 }
114 },
115 }
116 }
117 })
118 };
119
120 #[cfg(target_os = "macos")]
121 let ft = {
122 let offset = source.position();
123 spawn_blocking(move || {
124 use nix::errno::Errno;
125
126 let mut total_transferred = 0;
127 let mut current_offset = offset;
128
129 let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
130 let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
131
132 loop {
133 let to_be_transfer = (size - total_transferred) as i64;
134
135 trace!(
136 "mac zero copy source fd: {} offset: {} len: {}, target: fd{}",
137 source_raw_fd, current_offset, to_be_transfer, target_raw_fd
138 );
139
140 let (res, bytes_transferred) = sendfile(
141 source_fd,
142 target_fd,
143 current_offset as i64,
144 Some(to_be_transfer),
145 None,
146 None,
147 );
148
149 trace!("mac zero copy bytes transferred: {}", bytes_transferred);
150
151 total_transferred += bytes_transferred as u64;
152 current_offset += bytes_transferred as u64;
153 match res {
154 Ok(_) => {
155 if bytes_transferred == 0 {
157 trace!("no more bytes transferred");
158 return Ok(total_transferred as usize);
159 }
160 if total_transferred < size {
161 debug!(
162 "current transferred: {} less than total: {}, continuing",
163 total_transferred, size
164 );
165 } else {
166 return Ok(total_transferred as usize);
167 }
168 }
169 Err(err) => {
170 if err == Errno::EAGAIN {
171 debug!("EAGAIN, try again");
172 sleep(std::time::Duration::from_millis(10));
173 } else {
174 error!("error sendfile: {}", err);
175 return Err(err.into());
176 }
177 }
178 }
179 }
180 })
181 };
182
183 ft.await
184 }
185}
186
187#[cfg(test)]
188mod tests {
189
190 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
191 use std::time;
192
193 use futures_lite::future::zip;
194 use futures_util::stream::StreamExt;
195 use tracing::debug;
196
197 use crate::file_slice::AsyncFileSlice;
198 use crate::fs::AsyncFileExtension;
199 use crate::net::TcpListener;
200 use crate::net::tcp_stream::stream;
201 use crate::timer::sleep;
202 use crate::{fs::util as file_util, zero_copy::ZeroCopy};
203 use futures_lite::AsyncReadExt;
204
205 use super::SendFileError;
206
207 #[fluvio_future::test]
208 async fn test_zero_copy_simple() {
209 let port = portpicker::pick_unused_port().expect("No free ports left");
210 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
211
212 let server = async {
214 #[allow(unused_mut)]
215 let mut listener = TcpListener::bind(addr).await?;
216
217 debug!("server: listening");
218 let mut incoming = listener.incoming();
219 if let Some(stream) = incoming.next().await {
220 debug!("server: got connection. waiting");
221 let mut tcp_stream = stream?;
222 let mut buf = [0; 30];
223 let len = tcp_stream.read(&mut buf).await?;
224 assert_eq!(len, 30);
225 } else {
226 panic!("client should connect");
227 }
228 Ok(()) as Result<(), SendFileError>
229 };
230
231 let client = async {
232 let file = file_util::open("test-data/apirequest.bin").await?;
233 sleep(time::Duration::from_millis(100)).await;
234
235 debug!("client: file loaded");
236 let mut stream = stream(&addr).await?;
237 debug!("client: connected to server");
238 let f_slice = file.as_slice(0, None).await?;
239 debug!("client: send back file using zero copy");
240 let writer = ZeroCopy::from(&mut stream);
241 let transfered = writer.copy_slice(&f_slice).await?;
242 assert_eq!(transfered, 30);
243 Ok(()) as Result<(), SendFileError>
244 };
245
246 let _ = zip(client, server).await;
249 }
250
251 #[fluvio_future::test]
252 async fn test_zero_copy_large_size() {
253 const MAX_BYTES: usize = 3000000;
254
255 use futures_lite::AsyncWriteExt;
256 use std::env::temp_dir;
257
258 const TEST_ITERATION: u16 = 2;
259
260 let port = portpicker::pick_unused_port().expect("No free ports left");
261 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
262
263 async fn init_file() {
264 let temp_file = temp_dir().join("async_large");
265 debug!("temp file: {:#?}", temp_file);
266 let mut file = file_util::create(temp_file.clone())
267 .await
268 .expect("file creation");
269
270 let bytes: Vec<u8> = vec![0; 1000];
271 for _ in 0..3000 {
272 file.write_all(&bytes).await.expect("writing");
273 }
274
275 file.sync_all().await.expect("flushing");
276 drop(file);
277 debug!("finish creating large test file");
278 }
279
280 let server = async {
282 let temp_file = temp_dir().join("async_large");
283 let file = file_util::open(&temp_file).await.expect("re opening");
285 let f_slice = file.as_slice(0, None).await.expect("filed opening");
286 assert_eq!(f_slice.len(), MAX_BYTES as u64);
287
288 let listener = TcpListener::bind(addr).await.expect("failed bind");
289
290 debug!("server: listening");
291 let mut incoming = listener.incoming();
292
293 for i in 0..TEST_ITERATION {
294 let stream = incoming.next().await.expect("client should connect");
295 debug!("server {} got connection. waiting", i);
296
297 let mut tcp_stream = stream.expect("stream");
298
299 debug!(
300 "server {}, send back file using zero copy: {:#?}",
301 i,
302 f_slice.len()
303 );
304
305 let writer = ZeroCopy::from(&mut tcp_stream);
306
307 if i == 0 {
308 let transferred = writer.copy_slice(&f_slice).await.expect("file slice");
309 assert_eq!(transferred, MAX_BYTES);
310 } else {
311 let slice2 = AsyncFileSlice::new(f_slice.fd(), 0, (MAX_BYTES * 2) as u64);
312 let transferred = writer.copy_slice(&slice2).await.expect("file slice");
313 assert_eq!(transferred, MAX_BYTES);
314 }
315 }
316 };
317
318 let client = async {
319 sleep(time::Duration::from_millis(100)).await;
321 debug!("client loop starting");
322
323 for i in 0..TEST_ITERATION {
324 debug!("client: Test loop: {}", i);
325 let mut stream = stream(&addr).await?;
326 debug!("client: {} connected trying to read", i);
327 let mut buffer = Vec::with_capacity(MAX_BYTES);
330 stream
331 .read_to_end(&mut buffer)
332 .await
333 .expect("no more buffer");
334 debug!("client: {} test success", i);
335
336 sleep(time::Duration::from_millis(10)).await;
339 }
340
341 Ok(()) as Result<(), SendFileError>
342 };
343
344 init_file().await;
345
346 let _ = zip(client, server).await;
348 }
349
350 #[fluvio_future::test]
352 async fn test_zero_copy_large_slace() {
353 let port = portpicker::pick_unused_port().expect("No free ports left");
354 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
355
356 let server = async {
358 #[allow(unused_mut)]
359 let mut listener = TcpListener::bind(addr).await?;
360
361 debug!("server: listening");
362 let mut incoming = listener.incoming();
363 if let Some(stream) = incoming.next().await {
364 debug!("server: got connection. waiting");
365 let mut tcp_stream = stream?;
366 let mut buf = [0; 30];
367 let len = tcp_stream.read(&mut buf).await?;
368 assert_eq!(len, 30);
369 } else {
370 panic!("client should connect");
371 }
372 Ok(()) as Result<(), SendFileError>
373 };
374
375 let client = async {
376 let file = file_util::open("test-data/apirequest.bin").await?;
377 sleep(time::Duration::from_millis(100)).await;
378
379 debug!("client: file loaded");
380 let mut stream = stream(&addr).await?;
381 debug!("client: connected to server");
382 let f_slice = file.as_slice(0, None).await?;
383 let max_slice = AsyncFileSlice::new(f_slice.fd(), 0, 1000);
384 debug!("slice: {:#?}", max_slice);
385 debug!("client: send back file using zero copy");
386 let writer = ZeroCopy::from(&mut stream);
387 let transfer = writer.copy_slice(&max_slice).await?;
388 assert_eq!(transfer, 30);
389 Ok(()) as Result<(), SendFileError>
390 };
391
392 let _ = zip(client, server).await;
395 }
396}