1use std::io::Error as IoError;
2use std::os::fd::BorrowedFd;
3use std::os::unix::io::{AsRawFd, RawFd};
4use std::thread::sleep;
5use thiserror::Error;
6
7#[allow(unused)]
8use nix::libc::off_t;
9use nix::sys::sendfile::sendfile;
10use nix::Error as NixError;
11
12use tracing::{debug, error, trace};
13
14use crate::task::spawn_blocking;
15
16use crate::file_slice::AsyncFileSlice;
17
18#[derive(Error, Debug)]
19pub enum SendFileError {
20 #[error("IO error: {source}")]
21 IoError {
22 #[from]
23 source: IoError,
24 },
25 #[error("Nix error: {source}")]
26 NixError {
27 #[from]
28 source: NixError,
29 },
30}
31
32pub struct ZeroCopy(RawFd);
34
35impl ZeroCopy {
36 pub fn from<S>(fd: &mut S) -> Self
37 where
38 S: AsRawFd,
39 {
40 Self(fd.as_raw_fd())
41 }
42
43 pub fn raw(fd: RawFd) -> Self {
44 Self(fd)
45 }
46}
47
48impl ZeroCopy {
49 pub async fn copy_slice(&self, source: &AsyncFileSlice) -> Result<usize, SendFileError> {
50 let size = source.len();
51 let target_raw_fd = self.0;
52 let source_raw_fd = source.fd();
53
54 #[cfg(target_os = "linux")]
55 let ft = {
56 let offset = source.position() as off_t;
57
58 spawn_blocking(move || {
59 let mut total_transferred: usize = 0; let mut current_offset = offset;
61
62 let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
63 let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
64
65 loop {
66 let to_be_transfer = size as usize - total_transferred;
67
68 trace!(
69 "trying: zero copy source fd: {} offset: {} len: {}, target fd: {}",
70 source_raw_fd,
71 current_offset,
72 to_be_transfer,
73 target_raw_fd
74 );
75
76 match sendfile(
77 target_fd,
78 source_fd,
79 Some(&mut current_offset),
80 to_be_transfer,
81 ) {
82 Ok(bytes_transferred) => {
83 trace!("bytes transferred: {}", bytes_transferred);
84 total_transferred += bytes_transferred;
85
86 if bytes_transferred == 0 {
88 return Ok(total_transferred);
89 }
90
91 if total_transferred < size as usize {
92 debug!(
93 "current transferred: {} less than total: {}, continuing",
94 total_transferred, size
95 );
96 } else {
97 trace!(
98 "actual: zero copy bytes transferred: {} out of {}, ",
99 bytes_transferred,
100 size
101 );
102
103 return Ok(total_transferred);
104 }
105 }
106 Err(err) => match err {
107 nix::errno::Errno::EAGAIN => {
108 debug!(
109 "EAGAIN, continuing source: {},target: {}",
110 source_raw_fd, target_raw_fd
111 );
112 sleep(std::time::Duration::from_millis(10));
113 }
114 _ => {
115 error!("error sendfile: {}", err);
116 return Err(err.into());
117 }
118 },
119 }
120 }
121 })
122 };
123
124 #[cfg(target_os = "macos")]
125 let ft = {
126 let offset = source.position();
127 spawn_blocking(move || {
128 use nix::errno::Errno;
129
130 let mut total_transferred = 0;
131 let mut current_offset = offset;
132
133 let target_fd = unsafe { BorrowedFd::borrow_raw(target_raw_fd) };
134 let source_fd = unsafe { BorrowedFd::borrow_raw(source_raw_fd) };
135
136 loop {
137 let to_be_transfer = (size - total_transferred) as i64;
138
139 trace!(
140 "mac zero copy source fd: {} offset: {} len: {}, target: fd{}",
141 source_raw_fd,
142 current_offset,
143 to_be_transfer,
144 target_raw_fd
145 );
146
147 let (res, bytes_transferred) = sendfile(
148 source_fd,
149 target_fd,
150 current_offset as i64,
151 Some(to_be_transfer),
152 None,
153 None,
154 );
155
156 trace!("mac zero copy bytes transferred: {}", bytes_transferred);
157
158 total_transferred += bytes_transferred as u64;
159 current_offset += bytes_transferred as u64;
160 match res {
161 Ok(_) => {
162 if bytes_transferred == 0 {
164 trace!("no more bytes transferred");
165 return Ok(total_transferred as usize);
166 }
167 if total_transferred < size {
168 debug!(
169 "current transferred: {} less than total: {}, continuing",
170 total_transferred, size
171 );
172 } else {
173 return Ok(total_transferred as usize);
174 }
175 }
176 Err(err) => {
177 if err == Errno::EAGAIN {
178 debug!("EAGAIN, try again");
179 sleep(std::time::Duration::from_millis(10));
180 } else {
181 error!("error sendfile: {}", err);
182 return Err(err.into());
183 }
184 }
185 }
186 }
187 })
188 };
189
190 ft.await
191 }
192}
193
194#[cfg(test)]
195mod tests {
196
197 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
198 use std::time;
199
200 use futures_lite::future::zip;
201 use futures_util::stream::StreamExt;
202 use tracing::debug;
203
204 use crate::file_slice::AsyncFileSlice;
205 use crate::fs::AsyncFileExtension;
206 use crate::net::tcp_stream::stream;
207 use crate::net::TcpListener;
208 use crate::timer::sleep;
209 use crate::{fs::util as file_util, zero_copy::ZeroCopy};
210 use futures_lite::AsyncReadExt;
211
212 use super::SendFileError;
213
214 #[fluvio_future::test]
215 async fn test_zero_copy_simple() {
216 let port = portpicker::pick_unused_port().expect("No free ports left");
217 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
218
219 let server = async {
221 #[allow(unused_mut)]
222 let mut listener = TcpListener::bind(addr).await?;
223
224 debug!("server: listening");
225 let mut incoming = listener.incoming();
226 if let Some(stream) = incoming.next().await {
227 debug!("server: got connection. waiting");
228 let mut tcp_stream = stream?;
229 let mut buf = [0; 30];
230 let len = tcp_stream.read(&mut buf).await?;
231 assert_eq!(len, 30);
232 } else {
233 panic!("client should connect");
234 }
235 Ok(()) as Result<(), SendFileError>
236 };
237
238 let client = async {
239 let file = file_util::open("test-data/apirequest.bin").await?;
240 sleep(time::Duration::from_millis(100)).await;
241
242 debug!("client: file loaded");
243 let mut stream = stream(&addr).await?;
244 debug!("client: connected to server");
245 let f_slice = file.as_slice(0, None).await?;
246 debug!("client: send back file using zero copy");
247 let writer = ZeroCopy::from(&mut stream);
248 let transfered = writer.copy_slice(&f_slice).await?;
249 assert_eq!(transfered, 30);
250 Ok(()) as Result<(), SendFileError>
251 };
252
253 let _ = zip(client, server).await;
256 }
257
258 #[fluvio_future::test]
259 async fn test_zero_copy_large_size() {
260 const MAX_BYTES: usize = 3000000;
261
262 use futures_lite::AsyncWriteExt;
263 use std::env::temp_dir;
264
265 const TEST_ITERATION: u16 = 2;
266
267 let port = portpicker::pick_unused_port().expect("No free ports left");
268 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
269
270 async fn init_file() {
271 let temp_file = temp_dir().join("async_large");
272 debug!("temp file: {:#?}", temp_file);
273 let mut file = file_util::create(temp_file.clone())
274 .await
275 .expect("file creation");
276
277 let bytes: Vec<u8> = vec![0; 1000];
278 for _ in 0..3000 {
279 file.write_all(&bytes).await.expect("writing");
280 }
281
282 file.sync_all().await.expect("flushing");
283 drop(file);
284 debug!("finish creating large test file");
285 }
286
287 let server = async {
289 let temp_file = temp_dir().join("async_large");
290 let file = file_util::open(&temp_file).await.expect("re opening");
292 let f_slice = file.as_slice(0, None).await.expect("filed opening");
293 assert_eq!(f_slice.len(), MAX_BYTES as u64);
294
295 let listener = TcpListener::bind(addr).await.expect("failed bind");
296
297 debug!("server: listening");
298 let mut incoming = listener.incoming();
299
300 for i in 0..TEST_ITERATION {
301 let stream = incoming.next().await.expect("client should connect");
302 debug!("server {} got connection. waiting", i);
303
304 let mut tcp_stream = stream.expect("stream");
305
306 debug!(
307 "server {}, send back file using zero copy: {:#?}",
308 i,
309 f_slice.len()
310 );
311
312 let writer = ZeroCopy::from(&mut tcp_stream);
313
314 if i == 0 {
315 let transferred = writer.copy_slice(&f_slice).await.expect("file slice");
316 assert_eq!(transferred, MAX_BYTES);
317 } else {
318 let slice2 = AsyncFileSlice::new(f_slice.fd(), 0, (MAX_BYTES * 2) as u64);
319 let transferred = writer.copy_slice(&slice2).await.expect("file slice");
320 assert_eq!(transferred, MAX_BYTES);
321 }
322 }
323 };
324
325 let client = async {
326 sleep(time::Duration::from_millis(100)).await;
328 debug!("client loop starting");
329
330 for i in 0..TEST_ITERATION {
331 debug!("client: Test loop: {}", i);
332 let mut stream = stream(&addr).await?;
333 debug!("client: {} connected trying to read", i);
334 let mut buffer = Vec::with_capacity(MAX_BYTES);
337 stream
338 .read_to_end(&mut buffer)
339 .await
340 .expect("no more buffer");
341 debug!("client: {} test success", i);
342
343 sleep(time::Duration::from_millis(10)).await;
346 }
347
348 Ok(()) as Result<(), SendFileError>
349 };
350
351 init_file().await;
352
353 let _ = zip(client, server).await;
355 }
356
357 #[fluvio_future::test]
359 async fn test_zero_copy_large_slace() {
360 let port = portpicker::pick_unused_port().expect("No free ports left");
361 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
362
363 let server = async {
365 #[allow(unused_mut)]
366 let mut listener = TcpListener::bind(addr).await?;
367
368 debug!("server: listening");
369 let mut incoming = listener.incoming();
370 if let Some(stream) = incoming.next().await {
371 debug!("server: got connection. waiting");
372 let mut tcp_stream = stream?;
373 let mut buf = [0; 30];
374 let len = tcp_stream.read(&mut buf).await?;
375 assert_eq!(len, 30);
376 } else {
377 panic!("client should connect");
378 }
379 Ok(()) as Result<(), SendFileError>
380 };
381
382 let client = async {
383 let file = file_util::open("test-data/apirequest.bin").await?;
384 sleep(time::Duration::from_millis(100)).await;
385
386 debug!("client: file loaded");
387 let mut stream = stream(&addr).await?;
388 debug!("client: connected to server");
389 let f_slice = file.as_slice(0, None).await?;
390 let max_slice = AsyncFileSlice::new(f_slice.fd(), 0, 1000);
391 debug!("slice: {:#?}", max_slice);
392 debug!("client: send back file using zero copy");
393 let writer = ZeroCopy::from(&mut stream);
394 let transfer = writer.copy_slice(&max_slice).await?;
395 assert_eq!(transfer, 30);
396 Ok(()) as Result<(), SendFileError>
397 };
398
399 let _ = zip(client, server).await;
402 }
403}