1use std::{
2 ffi::CString,
3 io,
4 marker::PhantomPinned,
5 os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6 pin::Pin,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
10use io_uring::{
11 opcode,
12 types::{Fd, FsyncFlags},
13};
14use pin_project_lite::pin_project;
15use socket2::{SockAddr, SockAddrStorage, socklen_t};
16
17use super::OpCode;
18pub use crate::sys::unix_op::*;
19use crate::{OpEntry, op::*, sys_slice::*, syscall};
20
21unsafe impl<
22 D: std::marker::Send + 'static,
23 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
24> OpCode for Asyncify<F, D>
25{
26 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
27 OpEntry::Blocking
28 }
29
30 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
31 let this = self.project();
32 let f = this
33 .f
34 .take()
35 .expect("the operate method could only be called once");
36 let BufResult(res, data) = f();
37 *this.data = Some(data);
38 res
39 }
40}
41
42unsafe impl<
43 S,
44 D: std::marker::Send + 'static,
45 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
46> OpCode for AsyncifyFd<S, F, D>
47{
48 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
49 OpEntry::Blocking
50 }
51
52 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
53 let this = self.project();
54 let f = this
55 .f
56 .take()
57 .expect("the operate method could only be called once");
58 let BufResult(res, data) = f(this.fd);
59 *this.data = Some(data);
60 res
61 }
62}
63
64unsafe impl OpCode for OpenFile {
65 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
66 opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
67 .flags(self.flags | libc::O_CLOEXEC)
68 .mode(self.mode)
69 .build()
70 .into()
71 }
72
73 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
74 self.call()
75 }
76}
77
78unsafe impl OpCode for CloseFile {
79 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
80 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
81 .build()
82 .into()
83 }
84
85 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
86 self.call()
87 }
88}
89
90unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
91 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
92 opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
93 .build()
94 .into()
95 }
96
97 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
98 self.call()
99 }
100}
101
102pin_project! {
103 pub struct FileStat<S> {
105 pub(crate) fd: S,
106 pub(crate) stat: Statx,
107 }
108}
109
110impl<S> FileStat<S> {
111 pub fn new(fd: S) -> Self {
113 Self {
114 fd,
115 stat: unsafe { std::mem::zeroed() },
116 }
117 }
118}
119
120unsafe impl<S: AsFd> OpCode for FileStat<S> {
121 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
122 let this = self.project();
123 static EMPTY_NAME: &[u8] = b"\0";
124 opcode::Statx::new(
125 Fd(this.fd.as_fd().as_fd().as_raw_fd()),
126 EMPTY_NAME.as_ptr().cast(),
127 this.stat as *mut _ as _,
128 )
129 .flags(libc::AT_EMPTY_PATH)
130 .mask(statx_mask())
131 .build()
132 .into()
133 }
134
135 #[cfg(gnulinux)]
136 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
137 let this = self.project();
138 static EMPTY_NAME: &[u8] = b"\0";
139 let res = syscall!(libc::statx(
140 this.fd.as_fd().as_raw_fd(),
141 EMPTY_NAME.as_ptr().cast(),
142 libc::AT_EMPTY_PATH,
143 statx_mask(),
144 this.stat as *mut _ as _
145 ))?;
146 Ok(res as _)
147 }
148
149 #[cfg(not(gnulinux))]
150 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
151 let this = self.project();
152 let mut stat = unsafe { std::mem::zeroed() };
153 let res = syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut stat))?;
154 *this.stat = stat_to_statx(stat);
155 Ok(res as _)
156 }
157}
158
159impl<S> IntoInner for FileStat<S> {
160 type Inner = Stat;
161
162 fn into_inner(self) -> Self::Inner {
163 statx_to_stat(self.stat)
164 }
165}
166
167pub struct PathStat {
169 pub(crate) path: CString,
170 pub(crate) stat: Statx,
171 pub(crate) follow_symlink: bool,
172}
173
174impl PathStat {
175 pub fn new(path: CString, follow_symlink: bool) -> Self {
177 Self {
178 path,
179 stat: unsafe { std::mem::zeroed() },
180 follow_symlink,
181 }
182 }
183}
184
185unsafe impl OpCode for PathStat {
186 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
187 let mut flags = libc::AT_EMPTY_PATH;
188 if !self.follow_symlink {
189 flags |= libc::AT_SYMLINK_NOFOLLOW;
190 }
191 opcode::Statx::new(
192 Fd(libc::AT_FDCWD),
193 self.path.as_ptr(),
194 std::ptr::addr_of_mut!(self.stat).cast(),
195 )
196 .flags(flags)
197 .mask(statx_mask())
198 .build()
199 .into()
200 }
201
202 #[cfg(gnulinux)]
203 fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
204 let mut flags = libc::AT_EMPTY_PATH;
205 if !self.follow_symlink {
206 flags |= libc::AT_SYMLINK_NOFOLLOW;
207 }
208 let res = syscall!(libc::statx(
209 libc::AT_FDCWD,
210 self.path.as_ptr(),
211 flags,
212 statx_mask(),
213 std::ptr::addr_of_mut!(self.stat).cast()
214 ))?;
215 Ok(res as _)
216 }
217
218 #[cfg(not(gnulinux))]
219 fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
220 let mut stat = unsafe { std::mem::zeroed() };
221 let res = if self.follow_symlink {
222 syscall!(libc::stat(self.path.as_ptr(), &mut stat))?
223 } else {
224 syscall!(libc::lstat(self.path.as_ptr(), &mut stat))?
225 };
226 self.stat = stat_to_statx(stat);
227 Ok(res as _)
228 }
229}
230
231impl IntoInner for PathStat {
232 type Inner = Stat;
233
234 fn into_inner(self) -> Self::Inner {
235 statx_to_stat(self.stat)
236 }
237}
238
239unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
240 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
241 let this = self.project();
242 let fd = Fd(this.fd.as_fd().as_raw_fd());
243 let slice = this.buffer.sys_slice_mut();
244 opcode::Read::new(
245 fd,
246 slice.ptr() as _,
247 slice.len().try_into().unwrap_or(u32::MAX),
248 )
249 .offset(*this.offset)
250 .build()
251 .into()
252 }
253}
254
255unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
256 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
257 let this = self.project();
258 *this.slices = this.buffer.sys_slices_mut();
259 opcode::Readv::new(
260 Fd(this.fd.as_fd().as_raw_fd()),
261 this.slices.as_ptr() as _,
262 this.slices.len().try_into().unwrap_or(u32::MAX),
263 )
264 .offset(*this.offset)
265 .build()
266 .into()
267 }
268}
269
270unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
271 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
272 let slice = self.buffer.as_init();
273 opcode::Write::new(
274 Fd(self.fd.as_fd().as_raw_fd()),
275 slice.as_ptr(),
276 slice.len().try_into().unwrap_or(u32::MAX),
277 )
278 .offset(self.offset)
279 .build()
280 .into()
281 }
282}
283
284unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
285 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
286 let this = self.project();
287 *this.slices = this.buffer.as_ref().sys_slices();
288 opcode::Writev::new(
289 Fd(this.fd.as_fd().as_raw_fd()),
290 this.slices.as_ptr() as _,
291 this.slices.len().try_into().unwrap_or(u32::MAX),
292 )
293 .offset(*this.offset)
294 .build()
295 .into()
296 }
297}
298
299unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
300 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
301 let fd = self.fd.as_fd().as_raw_fd();
302 let slice = self.project().buffer.sys_slice_mut();
303 opcode::Read::new(
304 Fd(fd),
305 slice.ptr() as _,
306 slice.len().try_into().unwrap_or(u32::MAX),
307 )
308 .build()
309 .into()
310 }
311}
312
313unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
314 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
315 let this = self.project();
316 *this.slices = this.buffer.sys_slices_mut();
317 opcode::Readv::new(
318 Fd(this.fd.as_fd().as_raw_fd()),
319 this.slices.as_ptr() as _,
320 this.slices.len().try_into().unwrap_or(u32::MAX),
321 )
322 .build()
323 .into()
324 }
325}
326
327unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
328 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
329 let slice = self.buffer.as_init();
330 opcode::Write::new(
331 Fd(self.fd.as_fd().as_raw_fd()),
332 slice.as_ptr(),
333 slice.len().try_into().unwrap_or(u32::MAX),
334 )
335 .build()
336 .into()
337 }
338}
339
340unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
341 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
342 let this = self.project();
343 *this.slices = this.buffer.as_ref().sys_slices();
344 opcode::Writev::new(
345 Fd(this.fd.as_fd().as_raw_fd()),
346 this.slices.as_ptr() as _,
347 this.slices.len().try_into().unwrap_or(u32::MAX),
348 )
349 .build()
350 .into()
351 }
352}
353
354unsafe impl<S: AsFd> OpCode for Sync<S> {
355 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
356 opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
357 .flags(if self.datasync {
358 FsyncFlags::DATASYNC
359 } else {
360 FsyncFlags::empty()
361 })
362 .build()
363 .into()
364 }
365}
366
367unsafe impl OpCode for Unlink {
368 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
369 opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
370 .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
371 .build()
372 .into()
373 }
374
375 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
376 self.call()
377 }
378}
379
380unsafe impl OpCode for CreateDir {
381 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
382 opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
383 .mode(self.mode)
384 .build()
385 .into()
386 }
387
388 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
389 self.call()
390 }
391}
392
393unsafe impl OpCode for Rename {
394 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
395 opcode::RenameAt::new(
396 Fd(libc::AT_FDCWD),
397 self.old_path.as_ptr(),
398 Fd(libc::AT_FDCWD),
399 self.new_path.as_ptr(),
400 )
401 .build()
402 .into()
403 }
404
405 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
406 self.call()
407 }
408}
409
410unsafe impl OpCode for Symlink {
411 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
412 opcode::SymlinkAt::new(
413 Fd(libc::AT_FDCWD),
414 self.source.as_ptr(),
415 self.target.as_ptr(),
416 )
417 .build()
418 .into()
419 }
420
421 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
422 self.call()
423 }
424}
425
426unsafe impl OpCode for HardLink {
427 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
428 opcode::LinkAt::new(
429 Fd(libc::AT_FDCWD),
430 self.source.as_ptr(),
431 Fd(libc::AT_FDCWD),
432 self.target.as_ptr(),
433 )
434 .build()
435 .into()
436 }
437
438 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
439 self.call()
440 }
441}
442
443unsafe impl OpCode for CreateSocket {
444 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
445 opcode::Socket::new(
446 self.domain,
447 self.socket_type | libc::SOCK_CLOEXEC,
448 self.protocol,
449 )
450 .build()
451 .into()
452 }
453
454 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
455 Ok(syscall!(libc::socket(
456 self.domain,
457 self.socket_type | libc::SOCK_CLOEXEC,
458 self.protocol
459 ))? as _)
460 }
461}
462
463unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
464 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
465 opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
466 .build()
467 .into()
468 }
469
470 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
471 self.call()
472 }
473}
474
475unsafe impl OpCode for CloseSocket {
476 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
477 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
478 .build()
479 .into()
480 }
481
482 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
483 self.call()
484 }
485}
486
487unsafe impl<S: AsFd> OpCode for Accept<S> {
488 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
489 let this = self.project();
490 opcode::Accept::new(
491 Fd(this.fd.as_fd().as_raw_fd()),
492 unsafe { this.buffer.view_as::<libc::sockaddr>() },
493 this.addr_len,
494 )
495 .flags(libc::SOCK_CLOEXEC)
496 .build()
497 .into()
498 }
499
500 unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
501 let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
503 *self.project().accepted_fd = Some(fd);
504 }
505}
506
507unsafe impl<S: AsFd> OpCode for Connect<S> {
508 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
509 opcode::Connect::new(
510 Fd(self.fd.as_fd().as_raw_fd()),
511 self.addr.as_ptr().cast(),
512 self.addr.len(),
513 )
514 .build()
515 .into()
516 }
517}
518
519unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
520 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
521 let fd = self.fd.as_fd().as_raw_fd();
522 let flags = self.flags;
523 let slice = self.project().buffer.sys_slice_mut();
524 opcode::Recv::new(
525 Fd(fd),
526 slice.ptr() as _,
527 slice.len().try_into().unwrap_or(u32::MAX),
528 )
529 .flags(flags)
530 .build()
531 .into()
532 }
533}
534
535unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
536 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
537 self.as_mut().set_msg();
538 let this = self.project();
539 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
540 .flags(*this.flags as _)
541 .build()
542 .into()
543 }
544}
545
546unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
547 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
548 let slice = self.buffer.as_init();
549 opcode::Send::new(
550 Fd(self.fd.as_fd().as_raw_fd()),
551 slice.as_ptr(),
552 slice.len().try_into().unwrap_or(u32::MAX),
553 )
554 .flags(self.flags)
555 .build()
556 .into()
557 }
558}
559
560unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
561 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
562 self.as_mut().set_msg();
563 let this = self.project();
564 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
565 .flags(*this.flags as _)
566 .build()
567 .into()
568 }
569}
570
571struct RecvFromHeader<S> {
572 pub(crate) fd: S,
573 pub(crate) addr: SockAddrStorage,
574 pub(crate) msg: libc::msghdr,
575 pub(crate) flags: i32,
576 _p: PhantomPinned,
577}
578
579impl<S> RecvFromHeader<S> {
580 pub fn new(fd: S, flags: i32) -> Self {
581 Self {
582 fd,
583 addr: SockAddrStorage::zeroed(),
584 msg: unsafe { std::mem::zeroed() },
585 flags,
586 _p: PhantomPinned,
587 }
588 }
589}
590
591impl<S: AsFd> RecvFromHeader<S> {
592 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
593 self.msg.msg_name = &mut self.addr as *mut _ as _;
594 self.msg.msg_namelen = self.addr.size_of() as _;
595 self.msg.msg_iov = slices.as_mut_ptr() as _;
596 self.msg.msg_iovlen = slices.len() as _;
597 opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
598 .flags(self.flags as _)
599 .build()
600 .into()
601 }
602
603 pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
604 (self.addr, self.msg.msg_namelen)
605 }
606}
607
608pin_project! {
609 pub struct RecvFrom<T: IoBufMut, S> {
611 header: RecvFromHeader<S>,
612 #[pin]
613 buffer: T,
614 slice: Option<SysSlice>,
615 }
616}
617
618impl<T: IoBufMut, S> RecvFrom<T, S> {
619 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
621 Self {
622 header: RecvFromHeader::new(fd, flags),
623 buffer,
624 slice: None,
625 }
626 }
627}
628
629unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
630 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
631 let this = self.project();
632 let slice = this.slice.insert(this.buffer.sys_slice_mut());
633 this.header.create_entry(std::slice::from_mut(slice))
634 }
635}
636
637impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
638 type Inner = (T, SockAddrStorage, socklen_t);
639
640 fn into_inner(self) -> Self::Inner {
641 let (addr, addr_len) = self.header.into_addr();
642 (self.buffer, addr, addr_len)
643 }
644}
645
646pin_project! {
647 pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
649 header: RecvFromHeader<S>,
650 #[pin]
651 buffer: T,
652 slice: Vec<SysSlice>,
653 }
654}
655
656impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
657 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
659 Self {
660 header: RecvFromHeader::new(fd, flags),
661 buffer,
662 slice: vec![],
663 }
664 }
665}
666
667unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
668 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
669 let this = self.project();
670 *this.slice = this.buffer.sys_slices_mut();
671 this.header.create_entry(this.slice)
672 }
673}
674
675impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
676 type Inner = (T, SockAddrStorage, socklen_t);
677
678 fn into_inner(self) -> Self::Inner {
679 let (addr, addr_len) = self.header.into_addr();
680 (self.buffer, addr, addr_len)
681 }
682}
683
684struct SendToHeader<S> {
685 pub(crate) fd: S,
686 pub(crate) addr: SockAddr,
687 pub(crate) msg: libc::msghdr,
688 pub(crate) flags: i32,
689 _p: PhantomPinned,
690}
691
692impl<S> SendToHeader<S> {
693 pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
694 Self {
695 fd,
696 addr,
697 msg: unsafe { std::mem::zeroed() },
698 flags,
699 _p: PhantomPinned,
700 }
701 }
702}
703
704impl<S: AsFd> SendToHeader<S> {
705 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
706 self.msg.msg_name = self.addr.as_ptr() as _;
707 self.msg.msg_namelen = self.addr.len();
708 self.msg.msg_iov = slices.as_mut_ptr() as _;
709 self.msg.msg_iovlen = slices.len() as _;
710 opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
711 .flags(self.flags as _)
712 .build()
713 .into()
714 }
715}
716
717pin_project! {
718 pub struct SendTo<T: IoBuf, S> {
720 header: SendToHeader<S>,
721 #[pin]
722 buffer: T,
723 slice: Option<SysSlice>,
724 }
725}
726
727impl<T: IoBuf, S> SendTo<T, S> {
728 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
730 Self {
731 header: SendToHeader::new(fd, addr, flags),
732 buffer,
733 slice: None,
734 }
735 }
736}
737
738unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
739 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
740 let this = self.project();
741 let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
742 this.header.create_entry(std::slice::from_mut(slice))
743 }
744}
745
746impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
747 type Inner = T;
748
749 fn into_inner(self) -> Self::Inner {
750 self.buffer
751 }
752}
753
754pin_project! {
755 pub struct SendToVectored<T: IoVectoredBuf, S> {
757 header: SendToHeader<S>,
758 #[pin]
759 buffer: T,
760 slice: Vec<SysSlice>,
761 }
762}
763
764impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
765 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
767 Self {
768 header: SendToHeader::new(fd, addr, flags),
769 buffer,
770 slice: vec![],
771 }
772 }
773}
774
775unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
776 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
777 let this = self.project();
778 *this.slice = this.buffer.as_ref().sys_slices();
779 this.header.create_entry(this.slice)
780 }
781}
782
783impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
784 type Inner = T;
785
786 fn into_inner(self) -> Self::Inner {
787 self.buffer
788 }
789}
790
791unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
792 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
793 self.as_mut().set_msg();
794 let this = self.project();
795 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
796 .flags(*this.flags as _)
797 .build()
798 .into()
799 }
800}
801
802unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
803 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
804 self.as_mut().set_msg();
805 let this = self.project();
806 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
807 .flags(*this.flags as _)
808 .build()
809 .into()
810 }
811}
812
813unsafe impl<S: AsFd> OpCode for PollOnce<S> {
814 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
815 let flags = match self.interest {
816 Interest::Readable => libc::POLLIN,
817 Interest::Writable => libc::POLLOUT,
818 };
819 opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
820 .build()
821 .into()
822 }
823}
824
825unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
826 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
827 opcode::Splice::new(
828 Fd(self.fd_in.as_fd().as_raw_fd()),
829 self.offset_in,
830 Fd(self.fd_out.as_fd().as_raw_fd()),
831 self.offset_out,
832 self.len.try_into().unwrap_or(u32::MAX),
833 )
834 .flags(self.flags)
835 .build()
836 .into()
837 }
838
839 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
840 let mut offset_in = self.offset_in;
841 let mut offset_out = self.offset_out;
842 let offset_in_ptr = if offset_in < 0 {
843 std::ptr::null_mut()
844 } else {
845 &mut offset_in
846 };
847 let offset_out_ptr = if offset_out < 0 {
848 std::ptr::null_mut()
849 } else {
850 &mut offset_out
851 };
852 Ok(syscall!(libc::splice(
853 self.fd_in.as_fd().as_raw_fd(),
854 offset_in_ptr,
855 self.fd_out.as_fd().as_raw_fd(),
856 offset_out_ptr,
857 self.len,
858 self.flags,
859 ))? as _)
860 }
861}
862
863mod buf_ring {
864 use std::{
865 io,
866 marker::PhantomPinned,
867 os::fd::{AsFd, AsRawFd},
868 pin::Pin,
869 ptr,
870 };
871
872 use io_uring::{opcode, squeue::Flags, types::Fd};
873
874 use super::OpCode;
875 use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
876
877 #[derive(Debug)]
879 pub struct ReadManagedAt<S> {
880 pub(crate) fd: S,
881 pub(crate) offset: u64,
882 buffer_group: u16,
883 len: u32,
884 _p: PhantomPinned,
885 }
886
887 impl<S> ReadManagedAt<S> {
888 pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
890 #[cfg(fusion)]
891 let buffer_pool = buffer_pool.as_io_uring();
892 Ok(Self {
893 fd,
894 offset,
895 buffer_group: buffer_pool.buffer_group(),
896 len: len.try_into().map_err(|_| {
897 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
898 })?,
899 _p: PhantomPinned,
900 })
901 }
902 }
903
904 unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
905 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
906 let fd = Fd(self.fd.as_fd().as_raw_fd());
907 let offset = self.offset;
908 opcode::Read::new(fd, ptr::null_mut(), self.len)
909 .offset(offset)
910 .buf_group(self.buffer_group)
911 .build()
912 .flags(Flags::BUFFER_SELECT)
913 .into()
914 }
915 }
916
917 impl<S> TakeBuffer for ReadManagedAt<S> {
918 type Buffer<'a> = BorrowedBuffer<'a>;
919 type BufferPool = BufferPool;
920
921 fn take_buffer(
922 self,
923 buffer_pool: &Self::BufferPool,
924 result: io::Result<usize>,
925 buffer_id: u16,
926 ) -> io::Result<Self::Buffer<'_>> {
927 #[cfg(fusion)]
928 let buffer_pool = buffer_pool.as_io_uring();
929 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
930 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
932 #[cfg(fusion)]
933 let res = res.map(BorrowedBuffer::new_io_uring);
934 res
935 }
936 }
937
938 pub struct ReadManaged<S> {
940 fd: S,
941 buffer_group: u16,
942 len: u32,
943 _p: PhantomPinned,
944 }
945
946 impl<S> ReadManaged<S> {
947 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
949 #[cfg(fusion)]
950 let buffer_pool = buffer_pool.as_io_uring();
951 Ok(Self {
952 fd,
953 buffer_group: buffer_pool.buffer_group(),
954 len: len.try_into().map_err(|_| {
955 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
956 })?,
957 _p: PhantomPinned,
958 })
959 }
960 }
961
962 unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
963 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
964 let fd = self.fd.as_fd().as_raw_fd();
965 opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
966 .buf_group(self.buffer_group)
967 .build()
968 .flags(Flags::BUFFER_SELECT)
969 .into()
970 }
971 }
972
973 impl<S> TakeBuffer for ReadManaged<S> {
974 type Buffer<'a> = BorrowedBuffer<'a>;
975 type BufferPool = BufferPool;
976
977 fn take_buffer(
978 self,
979 buffer_pool: &Self::BufferPool,
980 result: io::Result<usize>,
981 buffer_id: u16,
982 ) -> io::Result<Self::Buffer<'_>> {
983 #[cfg(fusion)]
984 let buffer_pool = buffer_pool.as_io_uring();
985 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
986 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
988 #[cfg(fusion)]
989 let res = res.map(BorrowedBuffer::new_io_uring);
990 res
991 }
992 }
993
994 pub struct RecvManaged<S> {
996 fd: S,
997 buffer_group: u16,
998 len: u32,
999 flags: i32,
1000 _p: PhantomPinned,
1001 }
1002
1003 impl<S> RecvManaged<S> {
1004 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1006 #[cfg(fusion)]
1007 let buffer_pool = buffer_pool.as_io_uring();
1008 Ok(Self {
1009 fd,
1010 buffer_group: buffer_pool.buffer_group(),
1011 len: len.try_into().map_err(|_| {
1012 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1013 })?,
1014 flags,
1015 _p: PhantomPinned,
1016 })
1017 }
1018 }
1019
1020 unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
1021 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1022 let fd = self.fd.as_fd().as_raw_fd();
1023 opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
1024 .flags(self.flags)
1025 .buf_group(self.buffer_group)
1026 .build()
1027 .flags(Flags::BUFFER_SELECT)
1028 .into()
1029 }
1030 }
1031
1032 impl<S> TakeBuffer for RecvManaged<S> {
1033 type Buffer<'a> = BorrowedBuffer<'a>;
1034 type BufferPool = BufferPool;
1035
1036 fn take_buffer(
1037 self,
1038 buffer_pool: &Self::BufferPool,
1039 result: io::Result<usize>,
1040 buffer_id: u16,
1041 ) -> io::Result<Self::Buffer<'_>> {
1042 #[cfg(fusion)]
1043 let buffer_pool = buffer_pool.as_io_uring();
1044 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1045 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
1047 #[cfg(fusion)]
1048 let res = res.map(BorrowedBuffer::new_io_uring);
1049 res
1050 }
1051 }
1052}
1053
1054pub use buf_ring::{ReadManaged, ReadManagedAt, RecvManaged};