Skip to main content

compio_driver/sys/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
10use io_uring::{
11    opcode,
12    types::{Fd, FsyncFlags},
13};
14use pin_project_lite::pin_project;
15use socket2::{SockAddr, SockAddrStorage, socklen_t};
16
17use super::OpCode;
18pub use crate::sys::unix_op::*;
19use crate::{OpEntry, op::*, sys_slice::*, syscall};
20
21unsafe impl<
22    D: std::marker::Send + 'static,
23    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
24> OpCode for Asyncify<F, D>
25{
26    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
27        OpEntry::Blocking
28    }
29
30    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
31        let this = self.project();
32        let f = this
33            .f
34            .take()
35            .expect("the operate method could only be called once");
36        let BufResult(res, data) = f();
37        *this.data = Some(data);
38        res
39    }
40}
41
42unsafe impl<
43    S,
44    D: std::marker::Send + 'static,
45    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
46> OpCode for AsyncifyFd<S, F, D>
47{
48    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
49        OpEntry::Blocking
50    }
51
52    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
53        let this = self.project();
54        let f = this
55            .f
56            .take()
57            .expect("the operate method could only be called once");
58        let BufResult(res, data) = f(this.fd);
59        *this.data = Some(data);
60        res
61    }
62}
63
64unsafe impl OpCode for OpenFile {
65    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
66        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
67            .flags(self.flags | libc::O_CLOEXEC)
68            .mode(self.mode)
69            .build()
70            .into()
71    }
72
73    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
74        self.call()
75    }
76}
77
78unsafe impl OpCode for CloseFile {
79    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
80        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
81            .build()
82            .into()
83    }
84
85    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
86        self.call()
87    }
88}
89
90unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
91    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
92        opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
93            .build()
94            .into()
95    }
96
97    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
98        self.call()
99    }
100}
101
102pin_project! {
103    /// Get metadata of an opened file.
104    pub struct FileStat<S> {
105        pub(crate) fd: S,
106        pub(crate) stat: Statx,
107    }
108}
109
110impl<S> FileStat<S> {
111    /// Create [`FileStat`].
112    pub fn new(fd: S) -> Self {
113        Self {
114            fd,
115            stat: unsafe { std::mem::zeroed() },
116        }
117    }
118}
119
120unsafe impl<S: AsFd> OpCode for FileStat<S> {
121    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
122        let this = self.project();
123        static EMPTY_NAME: &[u8] = b"\0";
124        opcode::Statx::new(
125            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
126            EMPTY_NAME.as_ptr().cast(),
127            this.stat as *mut _ as _,
128        )
129        .flags(libc::AT_EMPTY_PATH)
130        .mask(statx_mask())
131        .build()
132        .into()
133    }
134
135    #[cfg(gnulinux)]
136    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
137        let this = self.project();
138        static EMPTY_NAME: &[u8] = b"\0";
139        let res = syscall!(libc::statx(
140            this.fd.as_fd().as_raw_fd(),
141            EMPTY_NAME.as_ptr().cast(),
142            libc::AT_EMPTY_PATH,
143            statx_mask(),
144            this.stat as *mut _ as _
145        ))?;
146        Ok(res as _)
147    }
148
149    #[cfg(not(gnulinux))]
150    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
151        let this = self.project();
152        let mut stat = unsafe { std::mem::zeroed() };
153        let res = syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut stat))?;
154        *this.stat = stat_to_statx(stat);
155        Ok(res as _)
156    }
157}
158
159impl<S> IntoInner for FileStat<S> {
160    type Inner = Stat;
161
162    fn into_inner(self) -> Self::Inner {
163        statx_to_stat(self.stat)
164    }
165}
166
167/// Get metadata from path.
168pub struct PathStat {
169    pub(crate) path: CString,
170    pub(crate) stat: Statx,
171    pub(crate) follow_symlink: bool,
172}
173
174impl PathStat {
175    /// Create [`PathStat`].
176    pub fn new(path: CString, follow_symlink: bool) -> Self {
177        Self {
178            path,
179            stat: unsafe { std::mem::zeroed() },
180            follow_symlink,
181        }
182    }
183}
184
185unsafe impl OpCode for PathStat {
186    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
187        let mut flags = libc::AT_EMPTY_PATH;
188        if !self.follow_symlink {
189            flags |= libc::AT_SYMLINK_NOFOLLOW;
190        }
191        opcode::Statx::new(
192            Fd(libc::AT_FDCWD),
193            self.path.as_ptr(),
194            std::ptr::addr_of_mut!(self.stat).cast(),
195        )
196        .flags(flags)
197        .mask(statx_mask())
198        .build()
199        .into()
200    }
201
202    #[cfg(gnulinux)]
203    fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
204        let mut flags = libc::AT_EMPTY_PATH;
205        if !self.follow_symlink {
206            flags |= libc::AT_SYMLINK_NOFOLLOW;
207        }
208        let res = syscall!(libc::statx(
209            libc::AT_FDCWD,
210            self.path.as_ptr(),
211            flags,
212            statx_mask(),
213            std::ptr::addr_of_mut!(self.stat).cast()
214        ))?;
215        Ok(res as _)
216    }
217
218    #[cfg(not(gnulinux))]
219    fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
220        let mut stat = unsafe { std::mem::zeroed() };
221        let res = if self.follow_symlink {
222            syscall!(libc::stat(self.path.as_ptr(), &mut stat))?
223        } else {
224            syscall!(libc::lstat(self.path.as_ptr(), &mut stat))?
225        };
226        self.stat = stat_to_statx(stat);
227        Ok(res as _)
228    }
229}
230
231impl IntoInner for PathStat {
232    type Inner = Stat;
233
234    fn into_inner(self) -> Self::Inner {
235        statx_to_stat(self.stat)
236    }
237}
238
239unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
240    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
241        let this = self.project();
242        let fd = Fd(this.fd.as_fd().as_raw_fd());
243        let slice = this.buffer.sys_slice_mut();
244        opcode::Read::new(
245            fd,
246            slice.ptr() as _,
247            slice.len().try_into().unwrap_or(u32::MAX),
248        )
249        .offset(*this.offset)
250        .build()
251        .into()
252    }
253}
254
255unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
256    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
257        let this = self.project();
258        *this.slices = this.buffer.sys_slices_mut();
259        opcode::Readv::new(
260            Fd(this.fd.as_fd().as_raw_fd()),
261            this.slices.as_ptr() as _,
262            this.slices.len().try_into().unwrap_or(u32::MAX),
263        )
264        .offset(*this.offset)
265        .build()
266        .into()
267    }
268}
269
270unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
271    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
272        let slice = self.buffer.as_init();
273        opcode::Write::new(
274            Fd(self.fd.as_fd().as_raw_fd()),
275            slice.as_ptr(),
276            slice.len().try_into().unwrap_or(u32::MAX),
277        )
278        .offset(self.offset)
279        .build()
280        .into()
281    }
282}
283
284unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
285    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
286        let this = self.project();
287        *this.slices = this.buffer.as_ref().sys_slices();
288        opcode::Writev::new(
289            Fd(this.fd.as_fd().as_raw_fd()),
290            this.slices.as_ptr() as _,
291            this.slices.len().try_into().unwrap_or(u32::MAX),
292        )
293        .offset(*this.offset)
294        .build()
295        .into()
296    }
297}
298
299unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
300    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
301        let fd = self.fd.as_fd().as_raw_fd();
302        let slice = self.project().buffer.sys_slice_mut();
303        opcode::Read::new(
304            Fd(fd),
305            slice.ptr() as _,
306            slice.len().try_into().unwrap_or(u32::MAX),
307        )
308        .build()
309        .into()
310    }
311}
312
313unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
314    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
315        let this = self.project();
316        *this.slices = this.buffer.sys_slices_mut();
317        opcode::Readv::new(
318            Fd(this.fd.as_fd().as_raw_fd()),
319            this.slices.as_ptr() as _,
320            this.slices.len().try_into().unwrap_or(u32::MAX),
321        )
322        .build()
323        .into()
324    }
325}
326
327unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
328    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
329        let slice = self.buffer.as_init();
330        opcode::Write::new(
331            Fd(self.fd.as_fd().as_raw_fd()),
332            slice.as_ptr(),
333            slice.len().try_into().unwrap_or(u32::MAX),
334        )
335        .build()
336        .into()
337    }
338}
339
340unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
341    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
342        let this = self.project();
343        *this.slices = this.buffer.as_ref().sys_slices();
344        opcode::Writev::new(
345            Fd(this.fd.as_fd().as_raw_fd()),
346            this.slices.as_ptr() as _,
347            this.slices.len().try_into().unwrap_or(u32::MAX),
348        )
349        .build()
350        .into()
351    }
352}
353
354unsafe impl<S: AsFd> OpCode for Sync<S> {
355    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
356        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
357            .flags(if self.datasync {
358                FsyncFlags::DATASYNC
359            } else {
360                FsyncFlags::empty()
361            })
362            .build()
363            .into()
364    }
365}
366
367unsafe impl OpCode for Unlink {
368    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
369        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
370            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
371            .build()
372            .into()
373    }
374
375    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
376        self.call()
377    }
378}
379
380unsafe impl OpCode for CreateDir {
381    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
382        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
383            .mode(self.mode)
384            .build()
385            .into()
386    }
387
388    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
389        self.call()
390    }
391}
392
393unsafe impl OpCode for Rename {
394    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
395        opcode::RenameAt::new(
396            Fd(libc::AT_FDCWD),
397            self.old_path.as_ptr(),
398            Fd(libc::AT_FDCWD),
399            self.new_path.as_ptr(),
400        )
401        .build()
402        .into()
403    }
404
405    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
406        self.call()
407    }
408}
409
410unsafe impl OpCode for Symlink {
411    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
412        opcode::SymlinkAt::new(
413            Fd(libc::AT_FDCWD),
414            self.source.as_ptr(),
415            self.target.as_ptr(),
416        )
417        .build()
418        .into()
419    }
420
421    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
422        self.call()
423    }
424}
425
426unsafe impl OpCode for HardLink {
427    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
428        opcode::LinkAt::new(
429            Fd(libc::AT_FDCWD),
430            self.source.as_ptr(),
431            Fd(libc::AT_FDCWD),
432            self.target.as_ptr(),
433        )
434        .build()
435        .into()
436    }
437
438    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
439        self.call()
440    }
441}
442
443unsafe impl OpCode for CreateSocket {
444    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
445        opcode::Socket::new(
446            self.domain,
447            self.socket_type | libc::SOCK_CLOEXEC,
448            self.protocol,
449        )
450        .build()
451        .into()
452    }
453
454    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
455        Ok(syscall!(libc::socket(
456            self.domain,
457            self.socket_type | libc::SOCK_CLOEXEC,
458            self.protocol
459        ))? as _)
460    }
461}
462
463unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
464    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
465        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
466            .build()
467            .into()
468    }
469
470    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
471        self.call()
472    }
473}
474
475unsafe impl OpCode for CloseSocket {
476    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
477        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
478            .build()
479            .into()
480    }
481
482    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
483        self.call()
484    }
485}
486
487unsafe impl<S: AsFd> OpCode for Accept<S> {
488    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
489        let this = self.project();
490        opcode::Accept::new(
491            Fd(this.fd.as_fd().as_raw_fd()),
492            unsafe { this.buffer.view_as::<libc::sockaddr>() },
493            this.addr_len,
494        )
495        .flags(libc::SOCK_CLOEXEC)
496        .build()
497        .into()
498    }
499
500    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
501        // SAFETY: fd is a valid fd returned from kernel
502        let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
503        *self.project().accepted_fd = Some(fd);
504    }
505}
506
507unsafe impl<S: AsFd> OpCode for Connect<S> {
508    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
509        opcode::Connect::new(
510            Fd(self.fd.as_fd().as_raw_fd()),
511            self.addr.as_ptr().cast(),
512            self.addr.len(),
513        )
514        .build()
515        .into()
516    }
517}
518
519unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
520    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
521        let fd = self.fd.as_fd().as_raw_fd();
522        let flags = self.flags;
523        let slice = self.project().buffer.sys_slice_mut();
524        opcode::Recv::new(
525            Fd(fd),
526            slice.ptr() as _,
527            slice.len().try_into().unwrap_or(u32::MAX),
528        )
529        .flags(flags)
530        .build()
531        .into()
532    }
533}
534
535unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
536    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
537        self.as_mut().set_msg();
538        let this = self.project();
539        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
540            .flags(*this.flags as _)
541            .build()
542            .into()
543    }
544}
545
546unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
547    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
548        let slice = self.buffer.as_init();
549        opcode::Send::new(
550            Fd(self.fd.as_fd().as_raw_fd()),
551            slice.as_ptr(),
552            slice.len().try_into().unwrap_or(u32::MAX),
553        )
554        .flags(self.flags)
555        .build()
556        .into()
557    }
558}
559
560unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
561    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
562        self.as_mut().set_msg();
563        let this = self.project();
564        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
565            .flags(*this.flags as _)
566            .build()
567            .into()
568    }
569}
570
571struct RecvFromHeader<S> {
572    pub(crate) fd: S,
573    pub(crate) addr: SockAddrStorage,
574    pub(crate) msg: libc::msghdr,
575    pub(crate) flags: i32,
576    _p: PhantomPinned,
577}
578
579impl<S> RecvFromHeader<S> {
580    pub fn new(fd: S, flags: i32) -> Self {
581        Self {
582            fd,
583            addr: SockAddrStorage::zeroed(),
584            msg: unsafe { std::mem::zeroed() },
585            flags,
586            _p: PhantomPinned,
587        }
588    }
589}
590
591impl<S: AsFd> RecvFromHeader<S> {
592    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
593        self.msg.msg_name = &mut self.addr as *mut _ as _;
594        self.msg.msg_namelen = self.addr.size_of() as _;
595        self.msg.msg_iov = slices.as_mut_ptr() as _;
596        self.msg.msg_iovlen = slices.len() as _;
597        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
598            .flags(self.flags as _)
599            .build()
600            .into()
601    }
602
603    pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
604        (self.addr, self.msg.msg_namelen)
605    }
606}
607
608pin_project! {
609    /// Receive data and source address.
610    pub struct RecvFrom<T: IoBufMut, S> {
611        header: RecvFromHeader<S>,
612        #[pin]
613        buffer: T,
614        slice: Option<SysSlice>,
615    }
616}
617
618impl<T: IoBufMut, S> RecvFrom<T, S> {
619    /// Create [`RecvFrom`].
620    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
621        Self {
622            header: RecvFromHeader::new(fd, flags),
623            buffer,
624            slice: None,
625        }
626    }
627}
628
629unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
630    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
631        let this = self.project();
632        let slice = this.slice.insert(this.buffer.sys_slice_mut());
633        this.header.create_entry(std::slice::from_mut(slice))
634    }
635}
636
637impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
638    type Inner = (T, SockAddrStorage, socklen_t);
639
640    fn into_inner(self) -> Self::Inner {
641        let (addr, addr_len) = self.header.into_addr();
642        (self.buffer, addr, addr_len)
643    }
644}
645
646pin_project! {
647    /// Receive data and source address into vectored buffer.
648    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
649        header: RecvFromHeader<S>,
650        #[pin]
651        buffer: T,
652        slice: Vec<SysSlice>,
653    }
654}
655
656impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
657    /// Create [`RecvFromVectored`].
658    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
659        Self {
660            header: RecvFromHeader::new(fd, flags),
661            buffer,
662            slice: vec![],
663        }
664    }
665}
666
667unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
668    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
669        let this = self.project();
670        *this.slice = this.buffer.sys_slices_mut();
671        this.header.create_entry(this.slice)
672    }
673}
674
675impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
676    type Inner = (T, SockAddrStorage, socklen_t);
677
678    fn into_inner(self) -> Self::Inner {
679        let (addr, addr_len) = self.header.into_addr();
680        (self.buffer, addr, addr_len)
681    }
682}
683
684struct SendToHeader<S> {
685    pub(crate) fd: S,
686    pub(crate) addr: SockAddr,
687    pub(crate) msg: libc::msghdr,
688    pub(crate) flags: i32,
689    _p: PhantomPinned,
690}
691
692impl<S> SendToHeader<S> {
693    pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
694        Self {
695            fd,
696            addr,
697            msg: unsafe { std::mem::zeroed() },
698            flags,
699            _p: PhantomPinned,
700        }
701    }
702}
703
704impl<S: AsFd> SendToHeader<S> {
705    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
706        self.msg.msg_name = self.addr.as_ptr() as _;
707        self.msg.msg_namelen = self.addr.len();
708        self.msg.msg_iov = slices.as_mut_ptr() as _;
709        self.msg.msg_iovlen = slices.len() as _;
710        opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
711            .flags(self.flags as _)
712            .build()
713            .into()
714    }
715}
716
717pin_project! {
718    /// Send data to specified address.
719    pub struct SendTo<T: IoBuf, S> {
720        header: SendToHeader<S>,
721        #[pin]
722        buffer: T,
723        slice: Option<SysSlice>,
724    }
725}
726
727impl<T: IoBuf, S> SendTo<T, S> {
728    /// Create [`SendTo`].
729    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
730        Self {
731            header: SendToHeader::new(fd, addr, flags),
732            buffer,
733            slice: None,
734        }
735    }
736}
737
738unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
739    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
740        let this = self.project();
741        let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
742        this.header.create_entry(std::slice::from_mut(slice))
743    }
744}
745
746impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
747    type Inner = T;
748
749    fn into_inner(self) -> Self::Inner {
750        self.buffer
751    }
752}
753
754pin_project! {
755    /// Send data to specified address from vectored buffer.
756    pub struct SendToVectored<T: IoVectoredBuf, S> {
757        header: SendToHeader<S>,
758        #[pin]
759        buffer: T,
760        slice: Vec<SysSlice>,
761    }
762}
763
764impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
765    /// Create [`SendToVectored`].
766    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
767        Self {
768            header: SendToHeader::new(fd, addr, flags),
769            buffer,
770            slice: vec![],
771        }
772    }
773}
774
775unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
776    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
777        let this = self.project();
778        *this.slice = this.buffer.as_ref().sys_slices();
779        this.header.create_entry(this.slice)
780    }
781}
782
783impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
784    type Inner = T;
785
786    fn into_inner(self) -> Self::Inner {
787        self.buffer
788    }
789}
790
791unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
792    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
793        self.as_mut().set_msg();
794        let this = self.project();
795        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
796            .flags(*this.flags as _)
797            .build()
798            .into()
799    }
800}
801
802unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
803    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
804        self.as_mut().set_msg();
805        let this = self.project();
806        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
807            .flags(*this.flags as _)
808            .build()
809            .into()
810    }
811}
812
813unsafe impl<S: AsFd> OpCode for PollOnce<S> {
814    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
815        let flags = match self.interest {
816            Interest::Readable => libc::POLLIN,
817            Interest::Writable => libc::POLLOUT,
818        };
819        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
820            .build()
821            .into()
822    }
823}
824
825unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
826    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
827        opcode::Splice::new(
828            Fd(self.fd_in.as_fd().as_raw_fd()),
829            self.offset_in,
830            Fd(self.fd_out.as_fd().as_raw_fd()),
831            self.offset_out,
832            self.len.try_into().unwrap_or(u32::MAX),
833        )
834        .flags(self.flags)
835        .build()
836        .into()
837    }
838
839    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
840        let mut offset_in = self.offset_in;
841        let mut offset_out = self.offset_out;
842        let offset_in_ptr = if offset_in < 0 {
843            std::ptr::null_mut()
844        } else {
845            &mut offset_in
846        };
847        let offset_out_ptr = if offset_out < 0 {
848            std::ptr::null_mut()
849        } else {
850            &mut offset_out
851        };
852        Ok(syscall!(libc::splice(
853            self.fd_in.as_fd().as_raw_fd(),
854            offset_in_ptr,
855            self.fd_out.as_fd().as_raw_fd(),
856            offset_out_ptr,
857            self.len,
858            self.flags,
859        ))? as _)
860    }
861}
862
863mod buf_ring {
864    use std::{
865        io,
866        marker::PhantomPinned,
867        os::fd::{AsFd, AsRawFd},
868        pin::Pin,
869        ptr,
870    };
871
872    use io_uring::{opcode, squeue::Flags, types::Fd};
873
874    use super::OpCode;
875    use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
876
877    /// Read a file at specified position into specified buffer.
878    #[derive(Debug)]
879    pub struct ReadManagedAt<S> {
880        pub(crate) fd: S,
881        pub(crate) offset: u64,
882        buffer_group: u16,
883        len: u32,
884        _p: PhantomPinned,
885    }
886
887    impl<S> ReadManagedAt<S> {
888        /// Create [`ReadManagedAt`].
889        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
890            #[cfg(fusion)]
891            let buffer_pool = buffer_pool.as_io_uring();
892            Ok(Self {
893                fd,
894                offset,
895                buffer_group: buffer_pool.buffer_group(),
896                len: len.try_into().map_err(|_| {
897                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
898                })?,
899                _p: PhantomPinned,
900            })
901        }
902    }
903
904    unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
905        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
906            let fd = Fd(self.fd.as_fd().as_raw_fd());
907            let offset = self.offset;
908            opcode::Read::new(fd, ptr::null_mut(), self.len)
909                .offset(offset)
910                .buf_group(self.buffer_group)
911                .build()
912                .flags(Flags::BUFFER_SELECT)
913                .into()
914        }
915    }
916
917    impl<S> TakeBuffer for ReadManagedAt<S> {
918        type Buffer<'a> = BorrowedBuffer<'a>;
919        type BufferPool = BufferPool;
920
921        fn take_buffer(
922            self,
923            buffer_pool: &Self::BufferPool,
924            result: io::Result<usize>,
925            buffer_id: u16,
926        ) -> io::Result<Self::Buffer<'_>> {
927            #[cfg(fusion)]
928            let buffer_pool = buffer_pool.as_io_uring();
929            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
930            // SAFETY: result is valid
931            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
932            #[cfg(fusion)]
933            let res = res.map(BorrowedBuffer::new_io_uring);
934            res
935        }
936    }
937
938    /// Receive data from remote.
939    pub struct ReadManaged<S> {
940        fd: S,
941        buffer_group: u16,
942        len: u32,
943        _p: PhantomPinned,
944    }
945
946    impl<S> ReadManaged<S> {
947        /// Create [`ReadManaged`].
948        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
949            #[cfg(fusion)]
950            let buffer_pool = buffer_pool.as_io_uring();
951            Ok(Self {
952                fd,
953                buffer_group: buffer_pool.buffer_group(),
954                len: len.try_into().map_err(|_| {
955                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
956                })?,
957                _p: PhantomPinned,
958            })
959        }
960    }
961
962    unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
963        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
964            let fd = self.fd.as_fd().as_raw_fd();
965            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
966                .buf_group(self.buffer_group)
967                .build()
968                .flags(Flags::BUFFER_SELECT)
969                .into()
970        }
971    }
972
973    impl<S> TakeBuffer for ReadManaged<S> {
974        type Buffer<'a> = BorrowedBuffer<'a>;
975        type BufferPool = BufferPool;
976
977        fn take_buffer(
978            self,
979            buffer_pool: &Self::BufferPool,
980            result: io::Result<usize>,
981            buffer_id: u16,
982        ) -> io::Result<Self::Buffer<'_>> {
983            #[cfg(fusion)]
984            let buffer_pool = buffer_pool.as_io_uring();
985            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
986            // SAFETY: result is valid
987            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
988            #[cfg(fusion)]
989            let res = res.map(BorrowedBuffer::new_io_uring);
990            res
991        }
992    }
993
994    /// Receive data from remote.
995    pub struct RecvManaged<S> {
996        fd: S,
997        buffer_group: u16,
998        len: u32,
999        flags: i32,
1000        _p: PhantomPinned,
1001    }
1002
1003    impl<S> RecvManaged<S> {
1004        /// Create [`RecvManaged`].
1005        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
1006            #[cfg(fusion)]
1007            let buffer_pool = buffer_pool.as_io_uring();
1008            Ok(Self {
1009                fd,
1010                buffer_group: buffer_pool.buffer_group(),
1011                len: len.try_into().map_err(|_| {
1012                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
1013                })?,
1014                flags,
1015                _p: PhantomPinned,
1016            })
1017        }
1018    }
1019
1020    unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
1021        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
1022            let fd = self.fd.as_fd().as_raw_fd();
1023            opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
1024                .flags(self.flags)
1025                .buf_group(self.buffer_group)
1026                .build()
1027                .flags(Flags::BUFFER_SELECT)
1028                .into()
1029        }
1030    }
1031
1032    impl<S> TakeBuffer for RecvManaged<S> {
1033        type Buffer<'a> = BorrowedBuffer<'a>;
1034        type BufferPool = BufferPool;
1035
1036        fn take_buffer(
1037            self,
1038            buffer_pool: &Self::BufferPool,
1039            result: io::Result<usize>,
1040            buffer_id: u16,
1041        ) -> io::Result<Self::Buffer<'_>> {
1042            #[cfg(fusion)]
1043            let buffer_pool = buffer_pool.as_io_uring();
1044            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1045            // SAFETY: result is valid
1046            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
1047            #[cfg(fusion)]
1048            let res = res.map(BorrowedBuffer::new_io_uring);
1049            res
1050        }
1051    }
1052}
1053
1054pub use buf_ring::{ReadManaged, ReadManagedAt, RecvManaged};