compio_driver/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{
10    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
11};
12use io_uring::{
13    opcode,
14    types::{Fd, FsyncFlags},
15};
16use libc::{sockaddr_storage, socklen_t};
17use socket2::SockAddr;
18
19use super::OpCode;
20pub use crate::unix::op::*;
21use crate::{OpEntry, op::*, syscall};
22
23impl<
24    D: std::marker::Send + 'static,
25    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
26> OpCode for Asyncify<F, D>
27{
28    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
29        OpEntry::Blocking
30    }
31
32    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
33        // Safety: self won't be moved
34        let this = unsafe { self.get_unchecked_mut() };
35        let f = this
36            .f
37            .take()
38            .expect("the operate method could only be called once");
39        let BufResult(res, data) = f();
40        this.data = Some(data);
41        res
42    }
43}
44
45impl OpCode for OpenFile {
46    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
47        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
48            .flags(self.flags | libc::O_CLOEXEC)
49            .mode(self.mode)
50            .build()
51            .into()
52    }
53}
54
55impl OpCode for CloseFile {
56    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
57        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
58            .build()
59            .into()
60    }
61}
62
63/// Get metadata of an opened file.
64pub struct FileStat<S> {
65    pub(crate) fd: S,
66    pub(crate) stat: Statx,
67}
68
69impl<S> FileStat<S> {
70    /// Create [`FileStat`].
71    pub fn new(fd: S) -> Self {
72        Self {
73            fd,
74            stat: unsafe { std::mem::zeroed() },
75        }
76    }
77}
78
79impl<S: AsFd> OpCode for FileStat<S> {
80    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
81        let this = unsafe { self.get_unchecked_mut() };
82        static EMPTY_NAME: &[u8] = b"\0";
83        opcode::Statx::new(
84            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
85            EMPTY_NAME.as_ptr().cast(),
86            std::ptr::addr_of_mut!(this.stat).cast(),
87        )
88        .flags(libc::AT_EMPTY_PATH)
89        .build()
90        .into()
91    }
92}
93
94impl<S> IntoInner for FileStat<S> {
95    type Inner = libc::stat;
96
97    fn into_inner(self) -> Self::Inner {
98        statx_to_stat(self.stat)
99    }
100}
101
102/// Get metadata from path.
103pub struct PathStat {
104    pub(crate) path: CString,
105    pub(crate) stat: Statx,
106    pub(crate) follow_symlink: bool,
107}
108
109impl PathStat {
110    /// Create [`PathStat`].
111    pub fn new(path: CString, follow_symlink: bool) -> Self {
112        Self {
113            path,
114            stat: unsafe { std::mem::zeroed() },
115            follow_symlink,
116        }
117    }
118}
119
120impl OpCode for PathStat {
121    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
122        let mut flags = libc::AT_EMPTY_PATH;
123        if !self.follow_symlink {
124            flags |= libc::AT_SYMLINK_NOFOLLOW;
125        }
126        opcode::Statx::new(
127            Fd(libc::AT_FDCWD),
128            self.path.as_ptr(),
129            std::ptr::addr_of_mut!(self.stat).cast(),
130        )
131        .flags(flags)
132        .build()
133        .into()
134    }
135}
136
137impl IntoInner for PathStat {
138    type Inner = libc::stat;
139
140    fn into_inner(self) -> Self::Inner {
141        statx_to_stat(self.stat)
142    }
143}
144
145impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
146    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
147        let fd = Fd(self.fd.as_fd().as_raw_fd());
148        let offset = self.offset;
149        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
150        opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _)
151            .offset(offset)
152            .build()
153            .into()
154    }
155}
156
157impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
158    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
159        let this = unsafe { self.get_unchecked_mut() };
160        this.slices = unsafe { this.buffer.io_slices_mut() };
161        opcode::Readv::new(
162            Fd(this.fd.as_fd().as_raw_fd()),
163            this.slices.as_ptr() as _,
164            this.slices.len() as _,
165        )
166        .offset(this.offset)
167        .build()
168        .into()
169    }
170}
171
172impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
173    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
174        let slice = self.buffer.as_slice();
175        opcode::Write::new(
176            Fd(self.fd.as_fd().as_raw_fd()),
177            slice.as_ptr(),
178            slice.len() as _,
179        )
180        .offset(self.offset)
181        .build()
182        .into()
183    }
184}
185
186impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
187    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
188        let this = unsafe { self.get_unchecked_mut() };
189        this.slices = unsafe { this.buffer.io_slices() };
190        opcode::Writev::new(
191            Fd(this.fd.as_fd().as_raw_fd()),
192            this.slices.as_ptr() as _,
193            this.slices.len() as _,
194        )
195        .offset(this.offset)
196        .build()
197        .into()
198    }
199}
200
201impl<S: AsFd> OpCode for Sync<S> {
202    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
203        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
204            .flags(if self.datasync {
205                FsyncFlags::DATASYNC
206            } else {
207                FsyncFlags::empty()
208            })
209            .build()
210            .into()
211    }
212}
213
214impl OpCode for Unlink {
215    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
216        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
217            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
218            .build()
219            .into()
220    }
221}
222
223impl OpCode for CreateDir {
224    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
225        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
226            .mode(self.mode)
227            .build()
228            .into()
229    }
230}
231
232impl OpCode for Rename {
233    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
234        opcode::RenameAt::new(
235            Fd(libc::AT_FDCWD),
236            self.old_path.as_ptr(),
237            Fd(libc::AT_FDCWD),
238            self.new_path.as_ptr(),
239        )
240        .build()
241        .into()
242    }
243}
244
245impl OpCode for Symlink {
246    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
247        opcode::SymlinkAt::new(
248            Fd(libc::AT_FDCWD),
249            self.source.as_ptr(),
250            self.target.as_ptr(),
251        )
252        .build()
253        .into()
254    }
255}
256
257impl OpCode for HardLink {
258    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
259        opcode::LinkAt::new(
260            Fd(libc::AT_FDCWD),
261            self.source.as_ptr(),
262            Fd(libc::AT_FDCWD),
263            self.target.as_ptr(),
264        )
265        .build()
266        .into()
267    }
268}
269
270impl OpCode for CreateSocket {
271    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
272        opcode::Socket::new(
273            self.domain,
274            self.socket_type | libc::SOCK_CLOEXEC,
275            self.protocol,
276        )
277        .build()
278        .into()
279    }
280
281    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
282        Ok(syscall!(libc::socket(self.domain, self.socket_type, self.protocol))? as _)
283    }
284}
285
286impl<S: AsFd> OpCode for ShutdownSocket<S> {
287    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
288        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
289            .build()
290            .into()
291    }
292}
293
294impl OpCode for CloseSocket {
295    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
296        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
297            .build()
298            .into()
299    }
300}
301
302impl<S: AsFd> OpCode for Accept<S> {
303    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
304        let this = unsafe { self.get_unchecked_mut() };
305        opcode::Accept::new(
306            Fd(this.fd.as_fd().as_raw_fd()),
307            &mut this.buffer as *mut sockaddr_storage as *mut libc::sockaddr,
308            &mut this.addr_len,
309        )
310        .flags(libc::SOCK_CLOEXEC)
311        .build()
312        .into()
313    }
314
315    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
316        self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
317    }
318}
319
320impl<S: AsFd> OpCode for Connect<S> {
321    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
322        opcode::Connect::new(
323            Fd(self.fd.as_fd().as_raw_fd()),
324            self.addr.as_ptr(),
325            self.addr.len(),
326        )
327        .build()
328        .into()
329    }
330}
331
332impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
333    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
334        let fd = self.fd.as_fd().as_raw_fd();
335        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
336        opcode::Read::new(Fd(fd), slice.as_mut_ptr() as _, slice.len() as _)
337            .build()
338            .into()
339    }
340}
341
342impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
343    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
344        let this = unsafe { self.get_unchecked_mut() };
345        this.slices = unsafe { this.buffer.io_slices_mut() };
346        opcode::Readv::new(
347            Fd(this.fd.as_fd().as_raw_fd()),
348            this.slices.as_ptr() as _,
349            this.slices.len() as _,
350        )
351        .build()
352        .into()
353    }
354}
355
356impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
357    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
358        let slice = self.buffer.as_slice();
359        opcode::Write::new(
360            Fd(self.fd.as_fd().as_raw_fd()),
361            slice.as_ptr(),
362            slice.len() as _,
363        )
364        .build()
365        .into()
366    }
367}
368
369impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
370    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
371        let this = unsafe { self.get_unchecked_mut() };
372        this.slices = unsafe { this.buffer.io_slices() };
373        opcode::Writev::new(
374            Fd(this.fd.as_fd().as_raw_fd()),
375            this.slices.as_ptr() as _,
376            this.slices.len() as _,
377        )
378        .build()
379        .into()
380    }
381}
382
383struct RecvFromHeader<S> {
384    pub(crate) fd: S,
385    pub(crate) addr: sockaddr_storage,
386    pub(crate) msg: libc::msghdr,
387    _p: PhantomPinned,
388}
389
390impl<S> RecvFromHeader<S> {
391    pub fn new(fd: S) -> Self {
392        Self {
393            fd,
394            addr: unsafe { std::mem::zeroed() },
395            msg: unsafe { std::mem::zeroed() },
396            _p: PhantomPinned,
397        }
398    }
399}
400
401impl<S: AsFd> RecvFromHeader<S> {
402    pub fn create_entry(&mut self, slices: &mut [IoSliceMut]) -> OpEntry {
403        self.msg.msg_name = &mut self.addr as *mut _ as _;
404        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
405        self.msg.msg_iov = slices.as_mut_ptr() as _;
406        self.msg.msg_iovlen = slices.len() as _;
407        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
408            .build()
409            .into()
410    }
411
412    pub fn into_addr(self) -> (sockaddr_storage, socklen_t) {
413        (self.addr, self.msg.msg_namelen)
414    }
415}
416
417/// Receive data and source address.
418pub struct RecvFrom<T: IoBufMut, S> {
419    header: RecvFromHeader<S>,
420    buffer: T,
421    slice: [IoSliceMut; 1],
422}
423
424impl<T: IoBufMut, S> RecvFrom<T, S> {
425    /// Create [`RecvFrom`].
426    pub fn new(fd: S, buffer: T) -> Self {
427        Self {
428            header: RecvFromHeader::new(fd),
429            buffer,
430            // SAFETY: We never use this slice.
431            slice: [unsafe { IoSliceMut::from_slice(&mut []) }],
432        }
433    }
434}
435
436impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
437    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
438        let this = unsafe { self.get_unchecked_mut() };
439        this.slice[0] = unsafe { this.buffer.as_io_slice_mut() };
440        this.header.create_entry(&mut this.slice)
441    }
442}
443
444impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
445    type Inner = (T, sockaddr_storage, socklen_t);
446
447    fn into_inner(self) -> Self::Inner {
448        let (addr, addr_len) = self.header.into_addr();
449        (self.buffer, addr, addr_len)
450    }
451}
452
453/// Receive data and source address into vectored buffer.
454pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
455    header: RecvFromHeader<S>,
456    buffer: T,
457    slice: Vec<IoSliceMut>,
458}
459
460impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
461    /// Create [`RecvFromVectored`].
462    pub fn new(fd: S, buffer: T) -> Self {
463        Self {
464            header: RecvFromHeader::new(fd),
465            buffer,
466            slice: vec![],
467        }
468    }
469}
470
471impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
472    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
473        let this = unsafe { self.get_unchecked_mut() };
474        this.slice = unsafe { this.buffer.io_slices_mut() };
475        this.header.create_entry(&mut this.slice)
476    }
477}
478
479impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
480    type Inner = (T, sockaddr_storage, socklen_t);
481
482    fn into_inner(self) -> Self::Inner {
483        let (addr, addr_len) = self.header.into_addr();
484        (self.buffer, addr, addr_len)
485    }
486}
487
488struct SendToHeader<S> {
489    pub(crate) fd: S,
490    pub(crate) addr: SockAddr,
491    pub(crate) msg: libc::msghdr,
492    _p: PhantomPinned,
493}
494
495impl<S> SendToHeader<S> {
496    pub fn new(fd: S, addr: SockAddr) -> Self {
497        Self {
498            fd,
499            addr,
500            msg: unsafe { std::mem::zeroed() },
501            _p: PhantomPinned,
502        }
503    }
504}
505
506impl<S: AsFd> SendToHeader<S> {
507    pub fn create_entry(&mut self, slices: &mut [IoSlice]) -> OpEntry {
508        self.msg.msg_name = self.addr.as_ptr() as _;
509        self.msg.msg_namelen = self.addr.len();
510        self.msg.msg_iov = slices.as_mut_ptr() as _;
511        self.msg.msg_iovlen = slices.len() as _;
512        opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
513            .build()
514            .into()
515    }
516}
517
518/// Send data to specified address.
519pub struct SendTo<T: IoBuf, S> {
520    header: SendToHeader<S>,
521    buffer: T,
522    slice: [IoSlice; 1],
523}
524
525impl<T: IoBuf, S> SendTo<T, S> {
526    /// Create [`SendTo`].
527    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
528        Self {
529            header: SendToHeader::new(fd, addr),
530            buffer,
531            // SAFETY: We never use this slice.
532            slice: [unsafe { IoSlice::from_slice(&[]) }],
533        }
534    }
535}
536
537impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
538    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
539        let this = unsafe { self.get_unchecked_mut() };
540        this.slice[0] = unsafe { this.buffer.as_io_slice() };
541        this.header.create_entry(&mut this.slice)
542    }
543}
544
545impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
546    type Inner = T;
547
548    fn into_inner(self) -> Self::Inner {
549        self.buffer
550    }
551}
552
553/// Send data to specified address from vectored buffer.
554pub struct SendToVectored<T: IoVectoredBuf, S> {
555    header: SendToHeader<S>,
556    buffer: T,
557    slice: Vec<IoSlice>,
558}
559
560impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
561    /// Create [`SendToVectored`].
562    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
563        Self {
564            header: SendToHeader::new(fd, addr),
565            buffer,
566            slice: vec![],
567        }
568    }
569}
570
571impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
572    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
573        let this = unsafe { self.get_unchecked_mut() };
574        this.slice = unsafe { this.buffer.io_slices() };
575        this.header.create_entry(&mut this.slice)
576    }
577}
578
579impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
580    type Inner = T;
581
582    fn into_inner(self) -> Self::Inner {
583        self.buffer
584    }
585}
586
587impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
588    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
589        let this = unsafe { self.get_unchecked_mut() };
590        unsafe { this.set_msg() };
591        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &mut this.msg)
592            .build()
593            .into()
594    }
595}
596
597impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
598    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
599        let this = unsafe { self.get_unchecked_mut() };
600        unsafe { this.set_msg() };
601        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &this.msg)
602            .build()
603            .into()
604    }
605}
606
607impl<S: AsFd> OpCode for PollOnce<S> {
608    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
609        let flags = match self.interest {
610            Interest::Readable => libc::POLLIN,
611            Interest::Writable => libc::POLLOUT,
612        };
613        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
614            .build()
615            .into()
616    }
617}
618
619#[cfg(io_uring)]
620mod buf_ring {
621    use std::{
622        io,
623        marker::PhantomPinned,
624        os::fd::{AsFd, AsRawFd},
625        pin::Pin,
626        ptr,
627    };
628
629    use io_uring::{opcode, squeue::Flags, types::Fd};
630
631    use super::OpCode;
632    use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
633
634    /// Read a file at specified position into specified buffer.
635    #[derive(Debug)]
636    pub struct ReadManagedAt<S> {
637        pub(crate) fd: S,
638        pub(crate) offset: u64,
639        buffer_group: u16,
640        len: u32,
641        _p: PhantomPinned,
642    }
643
644    impl<S> ReadManagedAt<S> {
645        /// Create [`ReadManagedAt`].
646        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
647            #[cfg(fusion)]
648            let buffer_pool = buffer_pool.as_io_uring();
649            Ok(Self {
650                fd,
651                offset,
652                buffer_group: buffer_pool.buffer_group(),
653                len: len.try_into().map_err(|_| {
654                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
655                })?,
656                _p: PhantomPinned,
657            })
658        }
659    }
660
661    impl<S: AsFd> OpCode for ReadManagedAt<S> {
662        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
663            let fd = Fd(self.fd.as_fd().as_raw_fd());
664            let offset = self.offset;
665            opcode::Read::new(fd, ptr::null_mut(), self.len)
666                .offset(offset)
667                .buf_group(self.buffer_group)
668                .build()
669                .flags(Flags::BUFFER_SELECT)
670                .into()
671        }
672    }
673
674    impl<S> TakeBuffer for ReadManagedAt<S> {
675        type Buffer<'a> = BorrowedBuffer<'a>;
676        type BufferPool = BufferPool;
677
678        fn take_buffer(
679            self,
680            buffer_pool: &Self::BufferPool,
681            result: io::Result<usize>,
682            flags: u32,
683        ) -> io::Result<Self::Buffer<'_>> {
684            #[cfg(fusion)]
685            let buffer_pool = buffer_pool.as_io_uring();
686            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
687            // Safety: result is valid
688            let res = unsafe { buffer_pool.get_buffer(flags, result) };
689            #[cfg(fusion)]
690            let res = res.map(BorrowedBuffer::new_io_uring);
691            res
692        }
693    }
694
695    /// Receive data from remote.
696    pub struct RecvManaged<S> {
697        fd: S,
698        buffer_group: u16,
699        len: u32,
700        _p: PhantomPinned,
701    }
702
703    impl<S> RecvManaged<S> {
704        /// Create [`RecvBufferPool`].
705        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
706            #[cfg(fusion)]
707            let buffer_pool = buffer_pool.as_io_uring();
708            Ok(Self {
709                fd,
710                buffer_group: buffer_pool.buffer_group(),
711                len: len.try_into().map_err(|_| {
712                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
713                })?,
714                _p: PhantomPinned,
715            })
716        }
717    }
718
719    impl<S: AsFd> OpCode for RecvManaged<S> {
720        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
721            let fd = self.fd.as_fd().as_raw_fd();
722            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
723                .buf_group(self.buffer_group)
724                .build()
725                .flags(Flags::BUFFER_SELECT)
726                .into()
727        }
728    }
729
730    impl<S> TakeBuffer for RecvManaged<S> {
731        type Buffer<'a> = BorrowedBuffer<'a>;
732        type BufferPool = BufferPool;
733
734        fn take_buffer(
735            self,
736            buffer_pool: &Self::BufferPool,
737            result: io::Result<usize>,
738            flags: u32,
739        ) -> io::Result<Self::Buffer<'_>> {
740            #[cfg(fusion)]
741            let buffer_pool = buffer_pool.as_io_uring();
742            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
743            // Safety: result is valid
744            let res = unsafe { buffer_pool.get_buffer(flags, result) };
745            #[cfg(fusion)]
746            let res = res.map(BorrowedBuffer::new_io_uring);
747            res
748        }
749    }
750}
751
752#[cfg(io_uring)]
753pub use buf_ring::{ReadManagedAt, RecvManaged};
754
755#[cfg(not(io_uring))]
756mod fallback {
757    use std::pin::Pin;
758
759    use super::OpCode;
760    use crate::{AsFd, OpEntry, op::managed::*};
761
762    impl<S: AsFd> OpCode for ReadManagedAt<S> {
763        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
764            unsafe { self.map_unchecked_mut(|this| &mut this.op) }.create_entry()
765        }
766    }
767
768    impl<S: AsFd> OpCode for RecvManaged<S> {
769        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
770            unsafe { self.map_unchecked_mut(|this| &mut this.op) }.create_entry()
771        }
772    }
773}