Skip to main content

compio_driver/sys/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
10use io_uring::{
11    opcode,
12    types::{Fd, FsyncFlags},
13};
14use pin_project_lite::pin_project;
15use socket2::{SockAddr, SockAddrStorage, socklen_t};
16
17use super::OpCode;
18pub use crate::sys::unix_op::*;
19use crate::{OpEntry, op::*, sys_slice::*, syscall};
20
21impl<
22    D: std::marker::Send + 'static,
23    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
24> OpCode for Asyncify<F, D>
25{
26    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
27        OpEntry::Blocking
28    }
29
30    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
31        let this = self.project();
32        let f = this
33            .f
34            .take()
35            .expect("the operate method could only be called once");
36        let BufResult(res, data) = f();
37        *this.data = Some(data);
38        res
39    }
40}
41
42impl<
43    S,
44    D: std::marker::Send + 'static,
45    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
46> OpCode for AsyncifyFd<S, F, D>
47{
48    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
49        OpEntry::Blocking
50    }
51
52    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
53        let this = self.project();
54        let f = this
55            .f
56            .take()
57            .expect("the operate method could only be called once");
58        let BufResult(res, data) = f(this.fd);
59        *this.data = Some(data);
60        res
61    }
62}
63
64impl OpCode for OpenFile {
65    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
66        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
67            .flags(self.flags | libc::O_CLOEXEC)
68            .mode(self.mode)
69            .build()
70            .into()
71    }
72}
73
74impl OpCode for CloseFile {
75    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
76        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
77            .build()
78            .into()
79    }
80}
81
82impl<S: AsFd> OpCode for TruncateFile<S> {
83    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
84        if super::is_op_supported(opcode::Ftruncate::CODE) {
85            return opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
86                .build()
87                .into();
88        }
89
90        OpEntry::Blocking
91    }
92
93    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
94        self.truncate()
95    }
96}
97
98pin_project! {
99    /// Get metadata of an opened file.
100    pub struct FileStat<S> {
101        pub(crate) fd: S,
102        pub(crate) stat: Statx,
103    }
104}
105
106impl<S> FileStat<S> {
107    /// Create [`FileStat`].
108    pub fn new(fd: S) -> Self {
109        Self {
110            fd,
111            stat: unsafe { std::mem::zeroed() },
112        }
113    }
114}
115
116impl<S: AsFd> OpCode for FileStat<S> {
117    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
118        let this = self.project();
119        static EMPTY_NAME: &[u8] = b"\0";
120        opcode::Statx::new(
121            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
122            EMPTY_NAME.as_ptr().cast(),
123            this.stat as *mut _ as _,
124        )
125        .flags(libc::AT_EMPTY_PATH)
126        .mask(statx_mask())
127        .build()
128        .into()
129    }
130}
131
132impl<S> IntoInner for FileStat<S> {
133    type Inner = Stat;
134
135    fn into_inner(self) -> Self::Inner {
136        statx_to_stat(self.stat)
137    }
138}
139
140/// Get metadata from path.
141pub struct PathStat {
142    pub(crate) path: CString,
143    pub(crate) stat: Statx,
144    pub(crate) follow_symlink: bool,
145}
146
147impl PathStat {
148    /// Create [`PathStat`].
149    pub fn new(path: CString, follow_symlink: bool) -> Self {
150        Self {
151            path,
152            stat: unsafe { std::mem::zeroed() },
153            follow_symlink,
154        }
155    }
156}
157
158impl OpCode for PathStat {
159    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
160        let mut flags = libc::AT_EMPTY_PATH;
161        if !self.follow_symlink {
162            flags |= libc::AT_SYMLINK_NOFOLLOW;
163        }
164        opcode::Statx::new(
165            Fd(libc::AT_FDCWD),
166            self.path.as_ptr(),
167            std::ptr::addr_of_mut!(self.stat).cast(),
168        )
169        .flags(flags)
170        .mask(statx_mask())
171        .build()
172        .into()
173    }
174}
175
176impl IntoInner for PathStat {
177    type Inner = Stat;
178
179    fn into_inner(self) -> Self::Inner {
180        statx_to_stat(self.stat)
181    }
182}
183
184impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
185    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
186        let this = self.project();
187        let fd = Fd(this.fd.as_fd().as_raw_fd());
188        let slice = this.buffer.sys_slice_mut();
189        opcode::Read::new(
190            fd,
191            slice.ptr() as _,
192            slice.len().try_into().unwrap_or(u32::MAX),
193        )
194        .offset(*this.offset)
195        .build()
196        .into()
197    }
198}
199
200impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
201    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
202        let this = self.project();
203        *this.slices = this.buffer.sys_slices_mut();
204        opcode::Readv::new(
205            Fd(this.fd.as_fd().as_raw_fd()),
206            this.slices.as_ptr() as _,
207            this.slices.len().try_into().unwrap_or(u32::MAX),
208        )
209        .offset(*this.offset)
210        .build()
211        .into()
212    }
213}
214
215impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
216    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
217        let slice = self.buffer.as_init();
218        opcode::Write::new(
219            Fd(self.fd.as_fd().as_raw_fd()),
220            slice.as_ptr(),
221            slice.len().try_into().unwrap_or(u32::MAX),
222        )
223        .offset(self.offset)
224        .build()
225        .into()
226    }
227}
228
229impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
230    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
231        let this = self.project();
232        *this.slices = this.buffer.as_ref().sys_slices();
233        opcode::Writev::new(
234            Fd(this.fd.as_fd().as_raw_fd()),
235            this.slices.as_ptr() as _,
236            this.slices.len().try_into().unwrap_or(u32::MAX),
237        )
238        .offset(*this.offset)
239        .build()
240        .into()
241    }
242}
243
244impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
245    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
246        let fd = self.fd.as_fd().as_raw_fd();
247        let slice = self.project().buffer.sys_slice_mut();
248        opcode::Read::new(
249            Fd(fd),
250            slice.ptr() as _,
251            slice.len().try_into().unwrap_or(u32::MAX),
252        )
253        .build()
254        .into()
255    }
256}
257
258impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
259    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
260        let this = self.project();
261        *this.slices = this.buffer.sys_slices_mut();
262        opcode::Readv::new(
263            Fd(this.fd.as_fd().as_raw_fd()),
264            this.slices.as_ptr() as _,
265            this.slices.len().try_into().unwrap_or(u32::MAX),
266        )
267        .build()
268        .into()
269    }
270}
271
272impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
273    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
274        let slice = self.buffer.as_init();
275        opcode::Write::new(
276            Fd(self.fd.as_fd().as_raw_fd()),
277            slice.as_ptr(),
278            slice.len().try_into().unwrap_or(u32::MAX),
279        )
280        .build()
281        .into()
282    }
283}
284
285impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
286    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
287        let this = self.project();
288        *this.slices = this.buffer.as_ref().sys_slices();
289        opcode::Writev::new(
290            Fd(this.fd.as_fd().as_raw_fd()),
291            this.slices.as_ptr() as _,
292            this.slices.len().try_into().unwrap_or(u32::MAX),
293        )
294        .build()
295        .into()
296    }
297}
298
299impl<S: AsFd> OpCode for Sync<S> {
300    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
301        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
302            .flags(if self.datasync {
303                FsyncFlags::DATASYNC
304            } else {
305                FsyncFlags::empty()
306            })
307            .build()
308            .into()
309    }
310}
311
312impl OpCode for Unlink {
313    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
314        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
315            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
316            .build()
317            .into()
318    }
319}
320
321impl OpCode for CreateDir {
322    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
323        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
324            .mode(self.mode)
325            .build()
326            .into()
327    }
328}
329
330impl OpCode for Rename {
331    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
332        opcode::RenameAt::new(
333            Fd(libc::AT_FDCWD),
334            self.old_path.as_ptr(),
335            Fd(libc::AT_FDCWD),
336            self.new_path.as_ptr(),
337        )
338        .build()
339        .into()
340    }
341}
342
343impl OpCode for Symlink {
344    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
345        opcode::SymlinkAt::new(
346            Fd(libc::AT_FDCWD),
347            self.source.as_ptr(),
348            self.target.as_ptr(),
349        )
350        .build()
351        .into()
352    }
353}
354
355impl OpCode for HardLink {
356    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
357        opcode::LinkAt::new(
358            Fd(libc::AT_FDCWD),
359            self.source.as_ptr(),
360            Fd(libc::AT_FDCWD),
361            self.target.as_ptr(),
362        )
363        .build()
364        .into()
365    }
366}
367
368impl OpCode for CreateSocket {
369    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
370        if super::is_op_supported(opcode::Socket::CODE) {
371            opcode::Socket::new(
372                self.domain,
373                self.socket_type | libc::SOCK_CLOEXEC,
374                self.protocol,
375            )
376            .build()
377            .into()
378        } else {
379            OpEntry::Blocking
380        }
381    }
382
383    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
384        Ok(syscall!(libc::socket(
385            self.domain,
386            self.socket_type | libc::SOCK_CLOEXEC,
387            self.protocol
388        ))? as _)
389    }
390}
391
392impl<S: AsFd> OpCode for ShutdownSocket<S> {
393    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
394        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
395            .build()
396            .into()
397    }
398}
399
400impl OpCode for CloseSocket {
401    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
402        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
403            .build()
404            .into()
405    }
406}
407
408impl<S: AsFd> OpCode for Accept<S> {
409    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
410        let this = self.project();
411        opcode::Accept::new(
412            Fd(this.fd.as_fd().as_raw_fd()),
413            unsafe { this.buffer.view_as::<libc::sockaddr>() },
414            this.addr_len,
415        )
416        .flags(libc::SOCK_CLOEXEC)
417        .build()
418        .into()
419    }
420
421    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
422        // SAFETY: fd is a valid fd returned from kernel
423        let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
424        *self.project().accepted_fd = Some(fd);
425    }
426}
427
428impl<S: AsFd> OpCode for Connect<S> {
429    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
430        opcode::Connect::new(
431            Fd(self.fd.as_fd().as_raw_fd()),
432            self.addr.as_ptr().cast(),
433            self.addr.len(),
434        )
435        .build()
436        .into()
437    }
438}
439
440impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
441    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
442        let fd = self.fd.as_fd().as_raw_fd();
443        let flags = self.flags;
444        let slice = self.project().buffer.sys_slice_mut();
445        opcode::Recv::new(
446            Fd(fd),
447            slice.ptr() as _,
448            slice.len().try_into().unwrap_or(u32::MAX),
449        )
450        .flags(flags)
451        .build()
452        .into()
453    }
454}
455
456impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
457    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
458        self.as_mut().set_msg();
459        let this = self.project();
460        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
461            .flags(*this.flags as _)
462            .build()
463            .into()
464    }
465}
466
467impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
468    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
469        let slice = self.buffer.as_init();
470        opcode::Send::new(
471            Fd(self.fd.as_fd().as_raw_fd()),
472            slice.as_ptr(),
473            slice.len().try_into().unwrap_or(u32::MAX),
474        )
475        .flags(self.flags)
476        .build()
477        .into()
478    }
479}
480
481impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
482    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
483        self.as_mut().set_msg();
484        let this = self.project();
485        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
486            .flags(*this.flags as _)
487            .build()
488            .into()
489    }
490}
491
492struct RecvFromHeader<S> {
493    pub(crate) fd: S,
494    pub(crate) addr: SockAddrStorage,
495    pub(crate) msg: libc::msghdr,
496    pub(crate) flags: i32,
497    _p: PhantomPinned,
498}
499
500impl<S> RecvFromHeader<S> {
501    pub fn new(fd: S, flags: i32) -> Self {
502        Self {
503            fd,
504            addr: SockAddrStorage::zeroed(),
505            msg: unsafe { std::mem::zeroed() },
506            flags,
507            _p: PhantomPinned,
508        }
509    }
510}
511
512impl<S: AsFd> RecvFromHeader<S> {
513    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
514        self.msg.msg_name = &mut self.addr as *mut _ as _;
515        self.msg.msg_namelen = self.addr.size_of() as _;
516        self.msg.msg_iov = slices.as_mut_ptr() as _;
517        self.msg.msg_iovlen = slices.len() as _;
518        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
519            .flags(self.flags as _)
520            .build()
521            .into()
522    }
523
524    pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
525        (self.addr, self.msg.msg_namelen)
526    }
527}
528
529pin_project! {
530    /// Receive data and source address.
531    pub struct RecvFrom<T: IoBufMut, S> {
532        header: RecvFromHeader<S>,
533        #[pin]
534        buffer: T,
535        slice: Option<SysSlice>,
536    }
537}
538
539impl<T: IoBufMut, S> RecvFrom<T, S> {
540    /// Create [`RecvFrom`].
541    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
542        Self {
543            header: RecvFromHeader::new(fd, flags),
544            buffer,
545            slice: None,
546        }
547    }
548}
549
550impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
551    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
552        let this = self.project();
553        let slice = this.slice.insert(this.buffer.sys_slice_mut());
554        this.header.create_entry(std::slice::from_mut(slice))
555    }
556}
557
558impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
559    type Inner = (T, SockAddrStorage, socklen_t);
560
561    fn into_inner(self) -> Self::Inner {
562        let (addr, addr_len) = self.header.into_addr();
563        (self.buffer, addr, addr_len)
564    }
565}
566
567pin_project! {
568    /// Receive data and source address into vectored buffer.
569    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
570        header: RecvFromHeader<S>,
571        #[pin]
572        buffer: T,
573        slice: Vec<SysSlice>,
574    }
575}
576
577impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
578    /// Create [`RecvFromVectored`].
579    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
580        Self {
581            header: RecvFromHeader::new(fd, flags),
582            buffer,
583            slice: vec![],
584        }
585    }
586}
587
588impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
589    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
590        let this = self.project();
591        *this.slice = this.buffer.sys_slices_mut();
592        this.header.create_entry(this.slice)
593    }
594}
595
596impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
597    type Inner = (T, SockAddrStorage, socklen_t);
598
599    fn into_inner(self) -> Self::Inner {
600        let (addr, addr_len) = self.header.into_addr();
601        (self.buffer, addr, addr_len)
602    }
603}
604
605struct SendToHeader<S> {
606    pub(crate) fd: S,
607    pub(crate) addr: SockAddr,
608    pub(crate) msg: libc::msghdr,
609    pub(crate) flags: i32,
610    _p: PhantomPinned,
611}
612
613impl<S> SendToHeader<S> {
614    pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
615        Self {
616            fd,
617            addr,
618            msg: unsafe { std::mem::zeroed() },
619            flags,
620            _p: PhantomPinned,
621        }
622    }
623}
624
625impl<S: AsFd> SendToHeader<S> {
626    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
627        self.msg.msg_name = self.addr.as_ptr() as _;
628        self.msg.msg_namelen = self.addr.len();
629        self.msg.msg_iov = slices.as_mut_ptr() as _;
630        self.msg.msg_iovlen = slices.len() as _;
631        opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
632            .flags(self.flags as _)
633            .build()
634            .into()
635    }
636}
637
638pin_project! {
639    /// Send data to specified address.
640    pub struct SendTo<T: IoBuf, S> {
641        header: SendToHeader<S>,
642        #[pin]
643        buffer: T,
644        slice: Option<SysSlice>,
645    }
646}
647
648impl<T: IoBuf, S> SendTo<T, S> {
649    /// Create [`SendTo`].
650    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
651        Self {
652            header: SendToHeader::new(fd, addr, flags),
653            buffer,
654            slice: None,
655        }
656    }
657}
658
659impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
660    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
661        let this = self.project();
662        let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
663        this.header.create_entry(std::slice::from_mut(slice))
664    }
665}
666
667impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
668    type Inner = T;
669
670    fn into_inner(self) -> Self::Inner {
671        self.buffer
672    }
673}
674
675pin_project! {
676    /// Send data to specified address from vectored buffer.
677    pub struct SendToVectored<T: IoVectoredBuf, S> {
678        header: SendToHeader<S>,
679        #[pin]
680        buffer: T,
681        slice: Vec<SysSlice>,
682    }
683}
684
685impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
686    /// Create [`SendToVectored`].
687    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
688        Self {
689            header: SendToHeader::new(fd, addr, flags),
690            buffer,
691            slice: vec![],
692        }
693    }
694}
695
696impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
697    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
698        let this = self.project();
699        *this.slice = this.buffer.as_ref().sys_slices();
700        this.header.create_entry(this.slice)
701    }
702}
703
704impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
705    type Inner = T;
706
707    fn into_inner(self) -> Self::Inner {
708        self.buffer
709    }
710}
711
712impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
713    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
714        self.as_mut().set_msg();
715        let this = self.project();
716        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
717            .flags(*this.flags as _)
718            .build()
719            .into()
720    }
721}
722
723impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
724    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
725        self.as_mut().set_msg();
726        let this = self.project();
727        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
728            .flags(*this.flags as _)
729            .build()
730            .into()
731    }
732}
733
734impl<S: AsFd> OpCode for PollOnce<S> {
735    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
736        let flags = match self.interest {
737            Interest::Readable => libc::POLLIN,
738            Interest::Writable => libc::POLLOUT,
739        };
740        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
741            .build()
742            .into()
743    }
744}
745
746impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
747    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
748        opcode::Splice::new(
749            Fd(self.fd_in.as_fd().as_raw_fd()),
750            self.offset_in,
751            Fd(self.fd_out.as_fd().as_raw_fd()),
752            self.offset_out,
753            self.len.try_into().unwrap_or(u32::MAX),
754        )
755        .flags(self.flags)
756        .build()
757        .into()
758    }
759}
760
761mod buf_ring {
762    use std::{
763        io,
764        marker::PhantomPinned,
765        os::fd::{AsFd, AsRawFd},
766        pin::Pin,
767        ptr,
768    };
769
770    use io_uring::{opcode, squeue::Flags, types::Fd};
771
772    use super::OpCode;
773    use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
774
775    /// Read a file at specified position into specified buffer.
776    #[derive(Debug)]
777    pub struct ReadManagedAt<S> {
778        pub(crate) fd: S,
779        pub(crate) offset: u64,
780        buffer_group: u16,
781        len: u32,
782        _p: PhantomPinned,
783    }
784
785    impl<S> ReadManagedAt<S> {
786        /// Create [`ReadManagedAt`].
787        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
788            #[cfg(fusion)]
789            let buffer_pool = buffer_pool.as_io_uring();
790            Ok(Self {
791                fd,
792                offset,
793                buffer_group: buffer_pool.buffer_group(),
794                len: len.try_into().map_err(|_| {
795                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
796                })?,
797                _p: PhantomPinned,
798            })
799        }
800    }
801
802    impl<S: AsFd> OpCode for ReadManagedAt<S> {
803        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
804            let fd = Fd(self.fd.as_fd().as_raw_fd());
805            let offset = self.offset;
806            opcode::Read::new(fd, ptr::null_mut(), self.len)
807                .offset(offset)
808                .buf_group(self.buffer_group)
809                .build()
810                .flags(Flags::BUFFER_SELECT)
811                .into()
812        }
813    }
814
815    impl<S> TakeBuffer for ReadManagedAt<S> {
816        type Buffer<'a> = BorrowedBuffer<'a>;
817        type BufferPool = BufferPool;
818
819        fn take_buffer(
820            self,
821            buffer_pool: &Self::BufferPool,
822            result: io::Result<usize>,
823            buffer_id: u16,
824        ) -> io::Result<Self::Buffer<'_>> {
825            #[cfg(fusion)]
826            let buffer_pool = buffer_pool.as_io_uring();
827            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
828            // SAFETY: result is valid
829            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
830            #[cfg(fusion)]
831            let res = res.map(BorrowedBuffer::new_io_uring);
832            res
833        }
834    }
835
836    /// Receive data from remote.
837    pub struct ReadManaged<S> {
838        fd: S,
839        buffer_group: u16,
840        len: u32,
841        _p: PhantomPinned,
842    }
843
844    impl<S> ReadManaged<S> {
845        /// Create [`ReadManaged`].
846        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
847            #[cfg(fusion)]
848            let buffer_pool = buffer_pool.as_io_uring();
849            Ok(Self {
850                fd,
851                buffer_group: buffer_pool.buffer_group(),
852                len: len.try_into().map_err(|_| {
853                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
854                })?,
855                _p: PhantomPinned,
856            })
857        }
858    }
859
860    impl<S: AsFd> OpCode for ReadManaged<S> {
861        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
862            let fd = self.fd.as_fd().as_raw_fd();
863            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
864                .buf_group(self.buffer_group)
865                .build()
866                .flags(Flags::BUFFER_SELECT)
867                .into()
868        }
869    }
870
871    impl<S> TakeBuffer for ReadManaged<S> {
872        type Buffer<'a> = BorrowedBuffer<'a>;
873        type BufferPool = BufferPool;
874
875        fn take_buffer(
876            self,
877            buffer_pool: &Self::BufferPool,
878            result: io::Result<usize>,
879            buffer_id: u16,
880        ) -> io::Result<Self::Buffer<'_>> {
881            #[cfg(fusion)]
882            let buffer_pool = buffer_pool.as_io_uring();
883            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
884            // SAFETY: result is valid
885            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
886            #[cfg(fusion)]
887            let res = res.map(BorrowedBuffer::new_io_uring);
888            res
889        }
890    }
891
892    /// Receive data from remote.
893    pub struct RecvManaged<S> {
894        fd: S,
895        buffer_group: u16,
896        len: u32,
897        flags: i32,
898        _p: PhantomPinned,
899    }
900
901    impl<S> RecvManaged<S> {
902        /// Create [`RecvManaged`].
903        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
904            #[cfg(fusion)]
905            let buffer_pool = buffer_pool.as_io_uring();
906            Ok(Self {
907                fd,
908                buffer_group: buffer_pool.buffer_group(),
909                len: len.try_into().map_err(|_| {
910                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
911                })?,
912                flags,
913                _p: PhantomPinned,
914            })
915        }
916    }
917
918    impl<S: AsFd> OpCode for RecvManaged<S> {
919        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
920            let fd = self.fd.as_fd().as_raw_fd();
921            opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
922                .flags(self.flags)
923                .buf_group(self.buffer_group)
924                .build()
925                .flags(Flags::BUFFER_SELECT)
926                .into()
927        }
928    }
929
930    impl<S> TakeBuffer for RecvManaged<S> {
931        type Buffer<'a> = BorrowedBuffer<'a>;
932        type BufferPool = BufferPool;
933
934        fn take_buffer(
935            self,
936            buffer_pool: &Self::BufferPool,
937            result: io::Result<usize>,
938            buffer_id: u16,
939        ) -> io::Result<Self::Buffer<'_>> {
940            #[cfg(fusion)]
941            let buffer_pool = buffer_pool.as_io_uring();
942            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
943            // SAFETY: result is valid
944            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
945            #[cfg(fusion)]
946            let res = res.map(BorrowedBuffer::new_io_uring);
947            res
948        }
949    }
950}
951
952pub use buf_ring::{ReadManaged, ReadManagedAt, RecvManaged};