compio_driver/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{
10    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
11};
12use io_uring::{
13    opcode,
14    types::{Fd, FsyncFlags},
15};
16use socket2::{SockAddr, SockAddrStorage, socklen_t};
17
18use super::OpCode;
19pub use crate::unix::op::*;
20use crate::{OpEntry, op::*, syscall};
21
22impl<
23    D: std::marker::Send + 'static,
24    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
25> OpCode for Asyncify<F, D>
26{
27    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
28        OpEntry::Blocking
29    }
30
31    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
32        // Safety: self won't be moved
33        let this = unsafe { self.get_unchecked_mut() };
34        let f = this
35            .f
36            .take()
37            .expect("the operate method could only be called once");
38        let BufResult(res, data) = f();
39        this.data = Some(data);
40        res
41    }
42}
43
44impl OpCode for OpenFile {
45    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
46        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
47            .flags(self.flags | libc::O_CLOEXEC)
48            .mode(self.mode)
49            .build()
50            .into()
51    }
52}
53
54impl OpCode for CloseFile {
55    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
56        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
57            .build()
58            .into()
59    }
60}
61
62/// Get metadata of an opened file.
63pub struct FileStat<S> {
64    pub(crate) fd: S,
65    pub(crate) stat: Statx,
66}
67
68impl<S> FileStat<S> {
69    /// Create [`FileStat`].
70    pub fn new(fd: S) -> Self {
71        Self {
72            fd,
73            stat: unsafe { std::mem::zeroed() },
74        }
75    }
76}
77
78impl<S: AsFd> OpCode for FileStat<S> {
79    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
80        let this = unsafe { self.get_unchecked_mut() };
81        static EMPTY_NAME: &[u8] = b"\0";
82        opcode::Statx::new(
83            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
84            EMPTY_NAME.as_ptr().cast(),
85            std::ptr::addr_of_mut!(this.stat).cast(),
86        )
87        .flags(libc::AT_EMPTY_PATH)
88        .build()
89        .into()
90    }
91}
92
93impl<S> IntoInner for FileStat<S> {
94    type Inner = libc::stat;
95
96    fn into_inner(self) -> Self::Inner {
97        statx_to_stat(self.stat)
98    }
99}
100
101/// Get metadata from path.
102pub struct PathStat {
103    pub(crate) path: CString,
104    pub(crate) stat: Statx,
105    pub(crate) follow_symlink: bool,
106}
107
108impl PathStat {
109    /// Create [`PathStat`].
110    pub fn new(path: CString, follow_symlink: bool) -> Self {
111        Self {
112            path,
113            stat: unsafe { std::mem::zeroed() },
114            follow_symlink,
115        }
116    }
117}
118
119impl OpCode for PathStat {
120    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
121        let mut flags = libc::AT_EMPTY_PATH;
122        if !self.follow_symlink {
123            flags |= libc::AT_SYMLINK_NOFOLLOW;
124        }
125        opcode::Statx::new(
126            Fd(libc::AT_FDCWD),
127            self.path.as_ptr(),
128            std::ptr::addr_of_mut!(self.stat).cast(),
129        )
130        .flags(flags)
131        .build()
132        .into()
133    }
134}
135
136impl IntoInner for PathStat {
137    type Inner = libc::stat;
138
139    fn into_inner(self) -> Self::Inner {
140        statx_to_stat(self.stat)
141    }
142}
143
144impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
145    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
146        let fd = Fd(self.fd.as_fd().as_raw_fd());
147        let offset = self.offset;
148        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
149        opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _)
150            .offset(offset)
151            .build()
152            .into()
153    }
154}
155
156impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
157    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
158        let this = unsafe { self.get_unchecked_mut() };
159        this.slices = unsafe { this.buffer.io_slices_mut() };
160        opcode::Readv::new(
161            Fd(this.fd.as_fd().as_raw_fd()),
162            this.slices.as_ptr() as _,
163            this.slices.len() as _,
164        )
165        .offset(this.offset)
166        .build()
167        .into()
168    }
169}
170
171impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
172    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
173        let slice = self.buffer.as_slice();
174        opcode::Write::new(
175            Fd(self.fd.as_fd().as_raw_fd()),
176            slice.as_ptr(),
177            slice.len() as _,
178        )
179        .offset(self.offset)
180        .build()
181        .into()
182    }
183}
184
185impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
186    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
187        let this = unsafe { self.get_unchecked_mut() };
188        this.slices = unsafe { this.buffer.io_slices() };
189        opcode::Writev::new(
190            Fd(this.fd.as_fd().as_raw_fd()),
191            this.slices.as_ptr() as _,
192            this.slices.len() as _,
193        )
194        .offset(this.offset)
195        .build()
196        .into()
197    }
198}
199
200impl<S: AsFd> OpCode for Sync<S> {
201    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
202        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
203            .flags(if self.datasync {
204                FsyncFlags::DATASYNC
205            } else {
206                FsyncFlags::empty()
207            })
208            .build()
209            .into()
210    }
211}
212
213impl OpCode for Unlink {
214    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
215        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
216            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
217            .build()
218            .into()
219    }
220}
221
222impl OpCode for CreateDir {
223    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
224        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
225            .mode(self.mode)
226            .build()
227            .into()
228    }
229}
230
231impl OpCode for Rename {
232    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
233        opcode::RenameAt::new(
234            Fd(libc::AT_FDCWD),
235            self.old_path.as_ptr(),
236            Fd(libc::AT_FDCWD),
237            self.new_path.as_ptr(),
238        )
239        .build()
240        .into()
241    }
242}
243
244impl OpCode for Symlink {
245    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
246        opcode::SymlinkAt::new(
247            Fd(libc::AT_FDCWD),
248            self.source.as_ptr(),
249            self.target.as_ptr(),
250        )
251        .build()
252        .into()
253    }
254}
255
256impl OpCode for HardLink {
257    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
258        opcode::LinkAt::new(
259            Fd(libc::AT_FDCWD),
260            self.source.as_ptr(),
261            Fd(libc::AT_FDCWD),
262            self.target.as_ptr(),
263        )
264        .build()
265        .into()
266    }
267}
268
269impl OpCode for CreateSocket {
270    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
271        if super::is_op_supported(opcode::Socket::CODE) {
272            opcode::Socket::new(
273                self.domain,
274                self.socket_type | libc::SOCK_CLOEXEC,
275                self.protocol,
276            )
277            .build()
278            .into()
279        } else {
280            OpEntry::Blocking
281        }
282    }
283
284    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
285        Ok(syscall!(libc::socket(
286            self.domain,
287            self.socket_type | libc::SOCK_CLOEXEC,
288            self.protocol
289        ))? as _)
290    }
291}
292
293impl<S: AsFd> OpCode for ShutdownSocket<S> {
294    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
295        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
296            .build()
297            .into()
298    }
299}
300
301impl OpCode for CloseSocket {
302    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
303        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
304            .build()
305            .into()
306    }
307}
308
309impl<S: AsFd> OpCode for Accept<S> {
310    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
311        let this = unsafe { self.get_unchecked_mut() };
312        opcode::Accept::new(
313            Fd(this.fd.as_fd().as_raw_fd()),
314            unsafe { this.buffer.view_as::<libc::sockaddr>() },
315            &mut this.addr_len,
316        )
317        .flags(libc::SOCK_CLOEXEC)
318        .build()
319        .into()
320    }
321
322    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
323        self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
324    }
325}
326
327impl<S: AsFd> OpCode for Connect<S> {
328    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
329        opcode::Connect::new(
330            Fd(self.fd.as_fd().as_raw_fd()),
331            self.addr.as_ptr().cast(),
332            self.addr.len(),
333        )
334        .build()
335        .into()
336    }
337}
338
339impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
340    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
341        let fd = self.fd.as_fd().as_raw_fd();
342        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
343        opcode::Read::new(Fd(fd), slice.as_mut_ptr() as _, slice.len() as _)
344            .build()
345            .into()
346    }
347}
348
349impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
350    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
351        let this = unsafe { self.get_unchecked_mut() };
352        this.slices = unsafe { this.buffer.io_slices_mut() };
353        opcode::Readv::new(
354            Fd(this.fd.as_fd().as_raw_fd()),
355            this.slices.as_ptr() as _,
356            this.slices.len() as _,
357        )
358        .build()
359        .into()
360    }
361}
362
363impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
364    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
365        let slice = self.buffer.as_slice();
366        opcode::Write::new(
367            Fd(self.fd.as_fd().as_raw_fd()),
368            slice.as_ptr(),
369            slice.len() as _,
370        )
371        .build()
372        .into()
373    }
374}
375
376impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
377    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
378        let this = unsafe { self.get_unchecked_mut() };
379        this.slices = unsafe { this.buffer.io_slices() };
380        opcode::Writev::new(
381            Fd(this.fd.as_fd().as_raw_fd()),
382            this.slices.as_ptr() as _,
383            this.slices.len() as _,
384        )
385        .build()
386        .into()
387    }
388}
389
390struct RecvFromHeader<S> {
391    pub(crate) fd: S,
392    pub(crate) addr: SockAddrStorage,
393    pub(crate) msg: libc::msghdr,
394    _p: PhantomPinned,
395}
396
397impl<S> RecvFromHeader<S> {
398    pub fn new(fd: S) -> Self {
399        Self {
400            fd,
401            addr: SockAddrStorage::zeroed(),
402            msg: unsafe { std::mem::zeroed() },
403            _p: PhantomPinned,
404        }
405    }
406}
407
408impl<S: AsFd> RecvFromHeader<S> {
409    pub fn create_entry(&mut self, slices: &mut [IoSliceMut]) -> OpEntry {
410        self.msg.msg_name = &mut self.addr as *mut _ as _;
411        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
412        self.msg.msg_iov = slices.as_mut_ptr() as _;
413        self.msg.msg_iovlen = slices.len() as _;
414        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
415            .build()
416            .into()
417    }
418
419    pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
420        (self.addr, self.msg.msg_namelen)
421    }
422}
423
424/// Receive data and source address.
425pub struct RecvFrom<T: IoBufMut, S> {
426    header: RecvFromHeader<S>,
427    buffer: T,
428    slice: [IoSliceMut; 1],
429}
430
431impl<T: IoBufMut, S> RecvFrom<T, S> {
432    /// Create [`RecvFrom`].
433    pub fn new(fd: S, buffer: T) -> Self {
434        Self {
435            header: RecvFromHeader::new(fd),
436            buffer,
437            // SAFETY: We never use this slice.
438            slice: [unsafe { IoSliceMut::from_slice(&mut []) }],
439        }
440    }
441}
442
443impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
444    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
445        let this = unsafe { self.get_unchecked_mut() };
446        this.slice[0] = unsafe { this.buffer.as_io_slice_mut() };
447        this.header.create_entry(&mut this.slice)
448    }
449}
450
451impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
452    type Inner = (T, SockAddrStorage, socklen_t);
453
454    fn into_inner(self) -> Self::Inner {
455        let (addr, addr_len) = self.header.into_addr();
456        (self.buffer, addr, addr_len)
457    }
458}
459
460/// Receive data and source address into vectored buffer.
461pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
462    header: RecvFromHeader<S>,
463    buffer: T,
464    slice: Vec<IoSliceMut>,
465}
466
467impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
468    /// Create [`RecvFromVectored`].
469    pub fn new(fd: S, buffer: T) -> Self {
470        Self {
471            header: RecvFromHeader::new(fd),
472            buffer,
473            slice: vec![],
474        }
475    }
476}
477
478impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
479    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
480        let this = unsafe { self.get_unchecked_mut() };
481        this.slice = unsafe { this.buffer.io_slices_mut() };
482        this.header.create_entry(&mut this.slice)
483    }
484}
485
486impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
487    type Inner = (T, SockAddrStorage, socklen_t);
488
489    fn into_inner(self) -> Self::Inner {
490        let (addr, addr_len) = self.header.into_addr();
491        (self.buffer, addr, addr_len)
492    }
493}
494
495struct SendToHeader<S> {
496    pub(crate) fd: S,
497    pub(crate) addr: SockAddr,
498    pub(crate) msg: libc::msghdr,
499    _p: PhantomPinned,
500}
501
502impl<S> SendToHeader<S> {
503    pub fn new(fd: S, addr: SockAddr) -> Self {
504        Self {
505            fd,
506            addr,
507            msg: unsafe { std::mem::zeroed() },
508            _p: PhantomPinned,
509        }
510    }
511}
512
513impl<S: AsFd> SendToHeader<S> {
514    pub fn create_entry(&mut self, slices: &mut [IoSlice]) -> OpEntry {
515        self.msg.msg_name = self.addr.as_ptr() as _;
516        self.msg.msg_namelen = self.addr.len();
517        self.msg.msg_iov = slices.as_mut_ptr() as _;
518        self.msg.msg_iovlen = slices.len() as _;
519        opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
520            .build()
521            .into()
522    }
523}
524
525/// Send data to specified address.
526pub struct SendTo<T: IoBuf, S> {
527    header: SendToHeader<S>,
528    buffer: T,
529    slice: [IoSlice; 1],
530}
531
532impl<T: IoBuf, S> SendTo<T, S> {
533    /// Create [`SendTo`].
534    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
535        Self {
536            header: SendToHeader::new(fd, addr),
537            buffer,
538            // SAFETY: We never use this slice.
539            slice: [unsafe { IoSlice::from_slice(&[]) }],
540        }
541    }
542}
543
544impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
545    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
546        let this = unsafe { self.get_unchecked_mut() };
547        this.slice[0] = unsafe { this.buffer.as_io_slice() };
548        this.header.create_entry(&mut this.slice)
549    }
550}
551
552impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
553    type Inner = T;
554
555    fn into_inner(self) -> Self::Inner {
556        self.buffer
557    }
558}
559
560/// Send data to specified address from vectored buffer.
561pub struct SendToVectored<T: IoVectoredBuf, S> {
562    header: SendToHeader<S>,
563    buffer: T,
564    slice: Vec<IoSlice>,
565}
566
567impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
568    /// Create [`SendToVectored`].
569    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
570        Self {
571            header: SendToHeader::new(fd, addr),
572            buffer,
573            slice: vec![],
574        }
575    }
576}
577
578impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
579    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
580        let this = unsafe { self.get_unchecked_mut() };
581        this.slice = unsafe { this.buffer.io_slices() };
582        this.header.create_entry(&mut this.slice)
583    }
584}
585
586impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
587    type Inner = T;
588
589    fn into_inner(self) -> Self::Inner {
590        self.buffer
591    }
592}
593
594impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
595    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
596        let this = unsafe { self.get_unchecked_mut() };
597        unsafe { this.set_msg() };
598        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &mut this.msg)
599            .build()
600            .into()
601    }
602}
603
604impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
605    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
606        let this = unsafe { self.get_unchecked_mut() };
607        unsafe { this.set_msg() };
608        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &this.msg)
609            .build()
610            .into()
611    }
612}
613
614impl<S: AsFd> OpCode for PollOnce<S> {
615    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
616        let flags = match self.interest {
617            Interest::Readable => libc::POLLIN,
618            Interest::Writable => libc::POLLOUT,
619        };
620        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
621            .build()
622            .into()
623    }
624}
625
626mod buf_ring {
627    use std::{
628        io,
629        marker::PhantomPinned,
630        os::fd::{AsFd, AsRawFd},
631        pin::Pin,
632        ptr,
633    };
634
635    use io_uring::{opcode, squeue::Flags, types::Fd};
636
637    use super::OpCode;
638    use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
639
640    /// Read a file at specified position into specified buffer.
641    #[derive(Debug)]
642    pub struct ReadManagedAt<S> {
643        pub(crate) fd: S,
644        pub(crate) offset: u64,
645        buffer_group: u16,
646        len: u32,
647        _p: PhantomPinned,
648    }
649
650    impl<S> ReadManagedAt<S> {
651        /// Create [`ReadManagedAt`].
652        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
653            #[cfg(fusion)]
654            let buffer_pool = buffer_pool.as_io_uring();
655            Ok(Self {
656                fd,
657                offset,
658                buffer_group: buffer_pool.buffer_group(),
659                len: len.try_into().map_err(|_| {
660                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
661                })?,
662                _p: PhantomPinned,
663            })
664        }
665    }
666
667    impl<S: AsFd> OpCode for ReadManagedAt<S> {
668        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
669            let fd = Fd(self.fd.as_fd().as_raw_fd());
670            let offset = self.offset;
671            opcode::Read::new(fd, ptr::null_mut(), self.len)
672                .offset(offset)
673                .buf_group(self.buffer_group)
674                .build()
675                .flags(Flags::BUFFER_SELECT)
676                .into()
677        }
678    }
679
680    impl<S> TakeBuffer for ReadManagedAt<S> {
681        type Buffer<'a> = BorrowedBuffer<'a>;
682        type BufferPool = BufferPool;
683
684        fn take_buffer(
685            self,
686            buffer_pool: &Self::BufferPool,
687            result: io::Result<usize>,
688            flags: u32,
689        ) -> io::Result<Self::Buffer<'_>> {
690            #[cfg(fusion)]
691            let buffer_pool = buffer_pool.as_io_uring();
692            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
693            // Safety: result is valid
694            let res = unsafe { buffer_pool.get_buffer(flags, result) };
695            #[cfg(fusion)]
696            let res = res.map(BorrowedBuffer::new_io_uring);
697            res
698        }
699    }
700
701    /// Receive data from remote.
702    pub struct RecvManaged<S> {
703        fd: S,
704        buffer_group: u16,
705        len: u32,
706        _p: PhantomPinned,
707    }
708
709    impl<S> RecvManaged<S> {
710        /// Create [`RecvBufferPool`].
711        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
712            #[cfg(fusion)]
713            let buffer_pool = buffer_pool.as_io_uring();
714            Ok(Self {
715                fd,
716                buffer_group: buffer_pool.buffer_group(),
717                len: len.try_into().map_err(|_| {
718                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
719                })?,
720                _p: PhantomPinned,
721            })
722        }
723    }
724
725    impl<S: AsFd> OpCode for RecvManaged<S> {
726        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
727            let fd = self.fd.as_fd().as_raw_fd();
728            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
729                .buf_group(self.buffer_group)
730                .build()
731                .flags(Flags::BUFFER_SELECT)
732                .into()
733        }
734    }
735
736    impl<S> TakeBuffer for RecvManaged<S> {
737        type Buffer<'a> = BorrowedBuffer<'a>;
738        type BufferPool = BufferPool;
739
740        fn take_buffer(
741            self,
742            buffer_pool: &Self::BufferPool,
743            result: io::Result<usize>,
744            flags: u32,
745        ) -> io::Result<Self::Buffer<'_>> {
746            #[cfg(fusion)]
747            let buffer_pool = buffer_pool.as_io_uring();
748            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
749            // Safety: result is valid
750            let res = unsafe { buffer_pool.get_buffer(flags, result) };
751            #[cfg(fusion)]
752            let res = res.map(BorrowedBuffer::new_io_uring);
753            res
754        }
755    }
756}
757
758pub use buf_ring::{ReadManagedAt, RecvManaged};