compio_driver/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{
10    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
11};
12use io_uring::{
13    opcode,
14    types::{Fd, FsyncFlags},
15};
16use socket2::{SockAddr, SockAddrStorage, socklen_t};
17
18use super::OpCode;
19pub use crate::unix::op::*;
20use crate::{OpEntry, op::*, syscall};
21
22impl<
23    D: std::marker::Send + 'static,
24    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
25> OpCode for Asyncify<F, D>
26{
27    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
28        OpEntry::Blocking
29    }
30
31    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
32        // SAFETY: self won't be moved
33        let this = unsafe { self.get_unchecked_mut() };
34        let f = this
35            .f
36            .take()
37            .expect("the operate method could only be called once");
38        let BufResult(res, data) = f();
39        this.data = Some(data);
40        res
41    }
42}
43
44impl<
45    S,
46    D: std::marker::Send + 'static,
47    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
48> OpCode for AsyncifyFd<S, F, D>
49{
50    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
51        OpEntry::Blocking
52    }
53
54    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
55        // SAFETY: self won't be moved
56        let this = unsafe { self.get_unchecked_mut() };
57        let f = this
58            .f
59            .take()
60            .expect("the operate method could only be called once");
61        let BufResult(res, data) = f(&this.fd);
62        this.data = Some(data);
63        res
64    }
65}
66
67impl OpCode for OpenFile {
68    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
69        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
70            .flags(self.flags | libc::O_CLOEXEC)
71            .mode(self.mode)
72            .build()
73            .into()
74    }
75}
76
77impl OpCode for CloseFile {
78    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
79        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
80            .build()
81            .into()
82    }
83}
84
85/// Get metadata of an opened file.
86pub struct FileStat<S> {
87    pub(crate) fd: S,
88    pub(crate) stat: Statx,
89}
90
91impl<S> FileStat<S> {
92    /// Create [`FileStat`].
93    pub fn new(fd: S) -> Self {
94        Self {
95            fd,
96            stat: unsafe { std::mem::zeroed() },
97        }
98    }
99}
100
101impl<S: AsFd> OpCode for FileStat<S> {
102    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
103        let this = unsafe { self.get_unchecked_mut() };
104        static EMPTY_NAME: &[u8] = b"\0";
105        opcode::Statx::new(
106            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
107            EMPTY_NAME.as_ptr().cast(),
108            std::ptr::addr_of_mut!(this.stat).cast(),
109        )
110        .flags(libc::AT_EMPTY_PATH)
111        .build()
112        .into()
113    }
114}
115
116impl<S> IntoInner for FileStat<S> {
117    type Inner = libc::stat;
118
119    fn into_inner(self) -> Self::Inner {
120        statx_to_stat(self.stat)
121    }
122}
123
124/// Get metadata from path.
125pub struct PathStat {
126    pub(crate) path: CString,
127    pub(crate) stat: Statx,
128    pub(crate) follow_symlink: bool,
129}
130
131impl PathStat {
132    /// Create [`PathStat`].
133    pub fn new(path: CString, follow_symlink: bool) -> Self {
134        Self {
135            path,
136            stat: unsafe { std::mem::zeroed() },
137            follow_symlink,
138        }
139    }
140}
141
142impl OpCode for PathStat {
143    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
144        let mut flags = libc::AT_EMPTY_PATH;
145        if !self.follow_symlink {
146            flags |= libc::AT_SYMLINK_NOFOLLOW;
147        }
148        opcode::Statx::new(
149            Fd(libc::AT_FDCWD),
150            self.path.as_ptr(),
151            std::ptr::addr_of_mut!(self.stat).cast(),
152        )
153        .flags(flags)
154        .build()
155        .into()
156    }
157}
158
159impl IntoInner for PathStat {
160    type Inner = libc::stat;
161
162    fn into_inner(self) -> Self::Inner {
163        statx_to_stat(self.stat)
164    }
165}
166
167impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
168    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
169        let fd = Fd(self.fd.as_fd().as_raw_fd());
170        let offset = self.offset;
171        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
172        opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _)
173            .offset(offset)
174            .build()
175            .into()
176    }
177}
178
179impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
180    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
181        let this = unsafe { self.get_unchecked_mut() };
182        this.slices = unsafe { this.buffer.io_slices_mut() };
183        opcode::Readv::new(
184            Fd(this.fd.as_fd().as_raw_fd()),
185            this.slices.as_ptr() as _,
186            this.slices.len() as _,
187        )
188        .offset(this.offset)
189        .build()
190        .into()
191    }
192}
193
194impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
195    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
196        let slice = self.buffer.as_slice();
197        opcode::Write::new(
198            Fd(self.fd.as_fd().as_raw_fd()),
199            slice.as_ptr(),
200            slice.len() as _,
201        )
202        .offset(self.offset)
203        .build()
204        .into()
205    }
206}
207
208impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
209    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
210        let this = unsafe { self.get_unchecked_mut() };
211        this.slices = unsafe { this.buffer.io_slices() };
212        opcode::Writev::new(
213            Fd(this.fd.as_fd().as_raw_fd()),
214            this.slices.as_ptr() as _,
215            this.slices.len() as _,
216        )
217        .offset(this.offset)
218        .build()
219        .into()
220    }
221}
222
223impl<S: AsFd> OpCode for Sync<S> {
224    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
225        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
226            .flags(if self.datasync {
227                FsyncFlags::DATASYNC
228            } else {
229                FsyncFlags::empty()
230            })
231            .build()
232            .into()
233    }
234}
235
236impl OpCode for Unlink {
237    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
238        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
239            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
240            .build()
241            .into()
242    }
243}
244
245impl OpCode for CreateDir {
246    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
247        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
248            .mode(self.mode)
249            .build()
250            .into()
251    }
252}
253
254impl OpCode for Rename {
255    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
256        opcode::RenameAt::new(
257            Fd(libc::AT_FDCWD),
258            self.old_path.as_ptr(),
259            Fd(libc::AT_FDCWD),
260            self.new_path.as_ptr(),
261        )
262        .build()
263        .into()
264    }
265}
266
267impl OpCode for Symlink {
268    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
269        opcode::SymlinkAt::new(
270            Fd(libc::AT_FDCWD),
271            self.source.as_ptr(),
272            self.target.as_ptr(),
273        )
274        .build()
275        .into()
276    }
277}
278
279impl OpCode for HardLink {
280    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
281        opcode::LinkAt::new(
282            Fd(libc::AT_FDCWD),
283            self.source.as_ptr(),
284            Fd(libc::AT_FDCWD),
285            self.target.as_ptr(),
286        )
287        .build()
288        .into()
289    }
290}
291
292impl OpCode for CreateSocket {
293    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
294        if super::is_op_supported(opcode::Socket::CODE) {
295            opcode::Socket::new(
296                self.domain,
297                self.socket_type | libc::SOCK_CLOEXEC,
298                self.protocol,
299            )
300            .build()
301            .into()
302        } else {
303            OpEntry::Blocking
304        }
305    }
306
307    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
308        Ok(syscall!(libc::socket(
309            self.domain,
310            self.socket_type | libc::SOCK_CLOEXEC,
311            self.protocol
312        ))? as _)
313    }
314}
315
316impl<S: AsFd> OpCode for ShutdownSocket<S> {
317    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
318        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
319            .build()
320            .into()
321    }
322}
323
324impl OpCode for CloseSocket {
325    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
326        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
327            .build()
328            .into()
329    }
330}
331
332impl<S: AsFd> OpCode for Accept<S> {
333    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
334        let this = unsafe { self.get_unchecked_mut() };
335        opcode::Accept::new(
336            Fd(this.fd.as_fd().as_raw_fd()),
337            unsafe { this.buffer.view_as::<libc::sockaddr>() },
338            &mut this.addr_len,
339        )
340        .flags(libc::SOCK_CLOEXEC)
341        .build()
342        .into()
343    }
344
345    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
346        // SAFETY:
347        // * self is pinned
348        // * fd is a valid fd returned from kernel
349        unsafe {
350            self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
351        }
352    }
353}
354
355impl<S: AsFd> OpCode for Connect<S> {
356    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
357        opcode::Connect::new(
358            Fd(self.fd.as_fd().as_raw_fd()),
359            self.addr.as_ptr().cast(),
360            self.addr.len(),
361        )
362        .build()
363        .into()
364    }
365}
366
367impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
368    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
369        let fd = self.fd.as_fd().as_raw_fd();
370        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
371        opcode::Read::new(Fd(fd), slice.as_mut_ptr() as _, slice.len() as _)
372            .build()
373            .into()
374    }
375}
376
377impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
378    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
379        let this = unsafe { self.get_unchecked_mut() };
380        this.slices = unsafe { this.buffer.io_slices_mut() };
381        opcode::Readv::new(
382            Fd(this.fd.as_fd().as_raw_fd()),
383            this.slices.as_ptr() as _,
384            this.slices.len() as _,
385        )
386        .build()
387        .into()
388    }
389}
390
391impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
392    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
393        let slice = self.buffer.as_slice();
394        opcode::Write::new(
395            Fd(self.fd.as_fd().as_raw_fd()),
396            slice.as_ptr(),
397            slice.len() as _,
398        )
399        .build()
400        .into()
401    }
402}
403
404impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
405    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
406        let this = unsafe { self.get_unchecked_mut() };
407        this.slices = unsafe { this.buffer.io_slices() };
408        opcode::Writev::new(
409            Fd(this.fd.as_fd().as_raw_fd()),
410            this.slices.as_ptr() as _,
411            this.slices.len() as _,
412        )
413        .build()
414        .into()
415    }
416}
417
418struct RecvFromHeader<S> {
419    pub(crate) fd: S,
420    pub(crate) addr: SockAddrStorage,
421    pub(crate) msg: libc::msghdr,
422    _p: PhantomPinned,
423}
424
425impl<S> RecvFromHeader<S> {
426    pub fn new(fd: S) -> Self {
427        Self {
428            fd,
429            addr: SockAddrStorage::zeroed(),
430            msg: unsafe { std::mem::zeroed() },
431            _p: PhantomPinned,
432        }
433    }
434}
435
436impl<S: AsFd> RecvFromHeader<S> {
437    pub fn create_entry(&mut self, slices: &mut [IoSliceMut]) -> OpEntry {
438        self.msg.msg_name = &mut self.addr as *mut _ as _;
439        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
440        self.msg.msg_iov = slices.as_mut_ptr() as _;
441        self.msg.msg_iovlen = slices.len() as _;
442        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
443            .build()
444            .into()
445    }
446
447    pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
448        (self.addr, self.msg.msg_namelen)
449    }
450}
451
452/// Receive data and source address.
453pub struct RecvFrom<T: IoBufMut, S> {
454    header: RecvFromHeader<S>,
455    buffer: T,
456    slice: [IoSliceMut; 1],
457}
458
459impl<T: IoBufMut, S> RecvFrom<T, S> {
460    /// Create [`RecvFrom`].
461    pub fn new(fd: S, buffer: T) -> Self {
462        Self {
463            header: RecvFromHeader::new(fd),
464            buffer,
465            // SAFETY: We never use this slice.
466            slice: [unsafe { IoSliceMut::from_slice(&mut []) }],
467        }
468    }
469}
470
471impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
472    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
473        let this = unsafe { self.get_unchecked_mut() };
474        this.slice[0] = unsafe { this.buffer.as_io_slice_mut() };
475        this.header.create_entry(&mut this.slice)
476    }
477}
478
479impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
480    type Inner = (T, SockAddrStorage, socklen_t);
481
482    fn into_inner(self) -> Self::Inner {
483        let (addr, addr_len) = self.header.into_addr();
484        (self.buffer, addr, addr_len)
485    }
486}
487
488/// Receive data and source address into vectored buffer.
489pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
490    header: RecvFromHeader<S>,
491    buffer: T,
492    slice: Vec<IoSliceMut>,
493}
494
495impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
496    /// Create [`RecvFromVectored`].
497    pub fn new(fd: S, buffer: T) -> Self {
498        Self {
499            header: RecvFromHeader::new(fd),
500            buffer,
501            slice: vec![],
502        }
503    }
504}
505
506impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
507    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
508        let this = unsafe { self.get_unchecked_mut() };
509        this.slice = unsafe { this.buffer.io_slices_mut() };
510        this.header.create_entry(&mut this.slice)
511    }
512}
513
514impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
515    type Inner = (T, SockAddrStorage, socklen_t);
516
517    fn into_inner(self) -> Self::Inner {
518        let (addr, addr_len) = self.header.into_addr();
519        (self.buffer, addr, addr_len)
520    }
521}
522
523struct SendToHeader<S> {
524    pub(crate) fd: S,
525    pub(crate) addr: SockAddr,
526    pub(crate) msg: libc::msghdr,
527    _p: PhantomPinned,
528}
529
530impl<S> SendToHeader<S> {
531    pub fn new(fd: S, addr: SockAddr) -> Self {
532        Self {
533            fd,
534            addr,
535            msg: unsafe { std::mem::zeroed() },
536            _p: PhantomPinned,
537        }
538    }
539}
540
541impl<S: AsFd> SendToHeader<S> {
542    pub fn create_entry(&mut self, slices: &mut [IoSlice]) -> OpEntry {
543        self.msg.msg_name = self.addr.as_ptr() as _;
544        self.msg.msg_namelen = self.addr.len();
545        self.msg.msg_iov = slices.as_mut_ptr() as _;
546        self.msg.msg_iovlen = slices.len() as _;
547        opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
548            .build()
549            .into()
550    }
551}
552
553/// Send data to specified address.
554pub struct SendTo<T: IoBuf, S> {
555    header: SendToHeader<S>,
556    buffer: T,
557    slice: [IoSlice; 1],
558}
559
560impl<T: IoBuf, S> SendTo<T, S> {
561    /// Create [`SendTo`].
562    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
563        Self {
564            header: SendToHeader::new(fd, addr),
565            buffer,
566            // SAFETY: We never use this slice.
567            slice: [unsafe { IoSlice::from_slice(&[]) }],
568        }
569    }
570}
571
572impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
573    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
574        let this = unsafe { self.get_unchecked_mut() };
575        this.slice[0] = unsafe { this.buffer.as_io_slice() };
576        this.header.create_entry(&mut this.slice)
577    }
578}
579
580impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
581    type Inner = T;
582
583    fn into_inner(self) -> Self::Inner {
584        self.buffer
585    }
586}
587
588/// Send data to specified address from vectored buffer.
589pub struct SendToVectored<T: IoVectoredBuf, S> {
590    header: SendToHeader<S>,
591    buffer: T,
592    slice: Vec<IoSlice>,
593}
594
595impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
596    /// Create [`SendToVectored`].
597    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
598        Self {
599            header: SendToHeader::new(fd, addr),
600            buffer,
601            slice: vec![],
602        }
603    }
604}
605
606impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
607    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
608        let this = unsafe { self.get_unchecked_mut() };
609        this.slice = unsafe { this.buffer.io_slices() };
610        this.header.create_entry(&mut this.slice)
611    }
612}
613
614impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
615    type Inner = T;
616
617    fn into_inner(self) -> Self::Inner {
618        self.buffer
619    }
620}
621
622impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
623    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
624        let this = unsafe { self.get_unchecked_mut() };
625        unsafe { this.set_msg() };
626        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &mut this.msg)
627            .build()
628            .into()
629    }
630}
631
632impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
633    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
634        let this = unsafe { self.get_unchecked_mut() };
635        unsafe { this.set_msg() };
636        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &this.msg)
637            .build()
638            .into()
639    }
640}
641
642impl<S: AsFd> OpCode for PollOnce<S> {
643    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
644        let flags = match self.interest {
645            Interest::Readable => libc::POLLIN,
646            Interest::Writable => libc::POLLOUT,
647        };
648        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
649            .build()
650            .into()
651    }
652}
653
654mod buf_ring {
655    use std::{
656        io,
657        marker::PhantomPinned,
658        os::fd::{AsFd, AsRawFd},
659        pin::Pin,
660        ptr,
661    };
662
663    use io_uring::{opcode, squeue::Flags, types::Fd};
664
665    use super::OpCode;
666    use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
667
668    /// Read a file at specified position into specified buffer.
669    #[derive(Debug)]
670    pub struct ReadManagedAt<S> {
671        pub(crate) fd: S,
672        pub(crate) offset: u64,
673        buffer_group: u16,
674        len: u32,
675        _p: PhantomPinned,
676    }
677
678    impl<S> ReadManagedAt<S> {
679        /// Create [`ReadManagedAt`].
680        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
681            #[cfg(fusion)]
682            let buffer_pool = buffer_pool.as_io_uring();
683            Ok(Self {
684                fd,
685                offset,
686                buffer_group: buffer_pool.buffer_group(),
687                len: len.try_into().map_err(|_| {
688                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
689                })?,
690                _p: PhantomPinned,
691            })
692        }
693    }
694
695    impl<S: AsFd> OpCode for ReadManagedAt<S> {
696        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
697            let fd = Fd(self.fd.as_fd().as_raw_fd());
698            let offset = self.offset;
699            opcode::Read::new(fd, ptr::null_mut(), self.len)
700                .offset(offset)
701                .buf_group(self.buffer_group)
702                .build()
703                .flags(Flags::BUFFER_SELECT)
704                .into()
705        }
706    }
707
708    impl<S> TakeBuffer for ReadManagedAt<S> {
709        type Buffer<'a> = BorrowedBuffer<'a>;
710        type BufferPool = BufferPool;
711
712        fn take_buffer(
713            self,
714            buffer_pool: &Self::BufferPool,
715            result: io::Result<usize>,
716            flags: u32,
717        ) -> io::Result<Self::Buffer<'_>> {
718            #[cfg(fusion)]
719            let buffer_pool = buffer_pool.as_io_uring();
720            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
721            // SAFETY: result is valid
722            let res = unsafe { buffer_pool.get_buffer(flags, result) };
723            #[cfg(fusion)]
724            let res = res.map(BorrowedBuffer::new_io_uring);
725            res
726        }
727    }
728
729    /// Receive data from remote.
730    pub struct RecvManaged<S> {
731        fd: S,
732        buffer_group: u16,
733        len: u32,
734        _p: PhantomPinned,
735    }
736
737    impl<S> RecvManaged<S> {
738        /// Create [`RecvBufferPool`].
739        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
740            #[cfg(fusion)]
741            let buffer_pool = buffer_pool.as_io_uring();
742            Ok(Self {
743                fd,
744                buffer_group: buffer_pool.buffer_group(),
745                len: len.try_into().map_err(|_| {
746                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
747                })?,
748                _p: PhantomPinned,
749            })
750        }
751    }
752
753    impl<S: AsFd> OpCode for RecvManaged<S> {
754        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
755            let fd = self.fd.as_fd().as_raw_fd();
756            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
757                .buf_group(self.buffer_group)
758                .build()
759                .flags(Flags::BUFFER_SELECT)
760                .into()
761        }
762    }
763
764    impl<S> TakeBuffer for RecvManaged<S> {
765        type Buffer<'a> = BorrowedBuffer<'a>;
766        type BufferPool = BufferPool;
767
768        fn take_buffer(
769            self,
770            buffer_pool: &Self::BufferPool,
771            result: io::Result<usize>,
772            flags: u32,
773        ) -> io::Result<Self::Buffer<'_>> {
774            #[cfg(fusion)]
775            let buffer_pool = buffer_pool.as_io_uring();
776            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
777            // SAFETY: result is valid
778            let res = unsafe { buffer_pool.get_buffer(flags, result) };
779            #[cfg(fusion)]
780            let res = res.map(BorrowedBuffer::new_io_uring);
781            res
782        }
783    }
784}
785
786pub use buf_ring::{ReadManagedAt, RecvManaged};