1use std::{
2 ffi::CString,
3 io,
4 marker::PhantomPinned,
5 os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6 pin::Pin,
7};
8
9use compio_buf::{
10 BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
11};
12use io_uring::{
13 opcode,
14 types::{Fd, FsyncFlags},
15};
16use socket2::{SockAddr, SockAddrStorage, socklen_t};
17
18use super::OpCode;
19pub use crate::unix::op::*;
20use crate::{OpEntry, op::*, syscall};
21
22impl<
23 D: std::marker::Send + 'static,
24 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
25> OpCode for Asyncify<F, D>
26{
27 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
28 OpEntry::Blocking
29 }
30
31 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
32 let this = unsafe { self.get_unchecked_mut() };
34 let f = this
35 .f
36 .take()
37 .expect("the operate method could only be called once");
38 let BufResult(res, data) = f();
39 this.data = Some(data);
40 res
41 }
42}
43
44impl<
45 S,
46 D: std::marker::Send + 'static,
47 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
48> OpCode for AsyncifyFd<S, F, D>
49{
50 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
51 OpEntry::Blocking
52 }
53
54 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
55 let this = unsafe { self.get_unchecked_mut() };
57 let f = this
58 .f
59 .take()
60 .expect("the operate method could only be called once");
61 let BufResult(res, data) = f(&this.fd);
62 this.data = Some(data);
63 res
64 }
65}
66
67impl OpCode for OpenFile {
68 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
69 opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
70 .flags(self.flags | libc::O_CLOEXEC)
71 .mode(self.mode)
72 .build()
73 .into()
74 }
75}
76
77impl OpCode for CloseFile {
78 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
79 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
80 .build()
81 .into()
82 }
83}
84
85pub struct FileStat<S> {
87 pub(crate) fd: S,
88 pub(crate) stat: Statx,
89}
90
91impl<S> FileStat<S> {
92 pub fn new(fd: S) -> Self {
94 Self {
95 fd,
96 stat: unsafe { std::mem::zeroed() },
97 }
98 }
99}
100
101impl<S: AsFd> OpCode for FileStat<S> {
102 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
103 let this = unsafe { self.get_unchecked_mut() };
104 static EMPTY_NAME: &[u8] = b"\0";
105 opcode::Statx::new(
106 Fd(this.fd.as_fd().as_fd().as_raw_fd()),
107 EMPTY_NAME.as_ptr().cast(),
108 std::ptr::addr_of_mut!(this.stat).cast(),
109 )
110 .flags(libc::AT_EMPTY_PATH)
111 .build()
112 .into()
113 }
114}
115
116impl<S> IntoInner for FileStat<S> {
117 type Inner = libc::stat;
118
119 fn into_inner(self) -> Self::Inner {
120 statx_to_stat(self.stat)
121 }
122}
123
124pub struct PathStat {
126 pub(crate) path: CString,
127 pub(crate) stat: Statx,
128 pub(crate) follow_symlink: bool,
129}
130
131impl PathStat {
132 pub fn new(path: CString, follow_symlink: bool) -> Self {
134 Self {
135 path,
136 stat: unsafe { std::mem::zeroed() },
137 follow_symlink,
138 }
139 }
140}
141
142impl OpCode for PathStat {
143 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
144 let mut flags = libc::AT_EMPTY_PATH;
145 if !self.follow_symlink {
146 flags |= libc::AT_SYMLINK_NOFOLLOW;
147 }
148 opcode::Statx::new(
149 Fd(libc::AT_FDCWD),
150 self.path.as_ptr(),
151 std::ptr::addr_of_mut!(self.stat).cast(),
152 )
153 .flags(flags)
154 .build()
155 .into()
156 }
157}
158
159impl IntoInner for PathStat {
160 type Inner = libc::stat;
161
162 fn into_inner(self) -> Self::Inner {
163 statx_to_stat(self.stat)
164 }
165}
166
167impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
168 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
169 let fd = Fd(self.fd.as_fd().as_raw_fd());
170 let offset = self.offset;
171 let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
172 opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _)
173 .offset(offset)
174 .build()
175 .into()
176 }
177}
178
179impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
180 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
181 let this = unsafe { self.get_unchecked_mut() };
182 this.slices = unsafe { this.buffer.io_slices_mut() };
183 opcode::Readv::new(
184 Fd(this.fd.as_fd().as_raw_fd()),
185 this.slices.as_ptr() as _,
186 this.slices.len() as _,
187 )
188 .offset(this.offset)
189 .build()
190 .into()
191 }
192}
193
194impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
195 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
196 let slice = self.buffer.as_slice();
197 opcode::Write::new(
198 Fd(self.fd.as_fd().as_raw_fd()),
199 slice.as_ptr(),
200 slice.len() as _,
201 )
202 .offset(self.offset)
203 .build()
204 .into()
205 }
206}
207
208impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
209 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
210 let this = unsafe { self.get_unchecked_mut() };
211 this.slices = unsafe { this.buffer.io_slices() };
212 opcode::Writev::new(
213 Fd(this.fd.as_fd().as_raw_fd()),
214 this.slices.as_ptr() as _,
215 this.slices.len() as _,
216 )
217 .offset(this.offset)
218 .build()
219 .into()
220 }
221}
222
223impl<S: AsFd> OpCode for Sync<S> {
224 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
225 opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
226 .flags(if self.datasync {
227 FsyncFlags::DATASYNC
228 } else {
229 FsyncFlags::empty()
230 })
231 .build()
232 .into()
233 }
234}
235
236impl OpCode for Unlink {
237 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
238 opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
239 .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
240 .build()
241 .into()
242 }
243}
244
245impl OpCode for CreateDir {
246 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
247 opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
248 .mode(self.mode)
249 .build()
250 .into()
251 }
252}
253
254impl OpCode for Rename {
255 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
256 opcode::RenameAt::new(
257 Fd(libc::AT_FDCWD),
258 self.old_path.as_ptr(),
259 Fd(libc::AT_FDCWD),
260 self.new_path.as_ptr(),
261 )
262 .build()
263 .into()
264 }
265}
266
267impl OpCode for Symlink {
268 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
269 opcode::SymlinkAt::new(
270 Fd(libc::AT_FDCWD),
271 self.source.as_ptr(),
272 self.target.as_ptr(),
273 )
274 .build()
275 .into()
276 }
277}
278
279impl OpCode for HardLink {
280 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
281 opcode::LinkAt::new(
282 Fd(libc::AT_FDCWD),
283 self.source.as_ptr(),
284 Fd(libc::AT_FDCWD),
285 self.target.as_ptr(),
286 )
287 .build()
288 .into()
289 }
290}
291
292impl OpCode for CreateSocket {
293 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
294 if super::is_op_supported(opcode::Socket::CODE) {
295 opcode::Socket::new(
296 self.domain,
297 self.socket_type | libc::SOCK_CLOEXEC,
298 self.protocol,
299 )
300 .build()
301 .into()
302 } else {
303 OpEntry::Blocking
304 }
305 }
306
307 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
308 Ok(syscall!(libc::socket(
309 self.domain,
310 self.socket_type | libc::SOCK_CLOEXEC,
311 self.protocol
312 ))? as _)
313 }
314}
315
316impl<S: AsFd> OpCode for ShutdownSocket<S> {
317 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
318 opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
319 .build()
320 .into()
321 }
322}
323
324impl OpCode for CloseSocket {
325 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
326 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
327 .build()
328 .into()
329 }
330}
331
332impl<S: AsFd> OpCode for Accept<S> {
333 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
334 let this = unsafe { self.get_unchecked_mut() };
335 opcode::Accept::new(
336 Fd(this.fd.as_fd().as_raw_fd()),
337 unsafe { this.buffer.view_as::<libc::sockaddr>() },
338 &mut this.addr_len,
339 )
340 .flags(libc::SOCK_CLOEXEC)
341 .build()
342 .into()
343 }
344
345 unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
346 unsafe {
350 self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
351 }
352 }
353}
354
355impl<S: AsFd> OpCode for Connect<S> {
356 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
357 opcode::Connect::new(
358 Fd(self.fd.as_fd().as_raw_fd()),
359 self.addr.as_ptr().cast(),
360 self.addr.len(),
361 )
362 .build()
363 .into()
364 }
365}
366
367impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
368 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
369 let fd = self.fd.as_fd().as_raw_fd();
370 let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
371 opcode::Read::new(Fd(fd), slice.as_mut_ptr() as _, slice.len() as _)
372 .build()
373 .into()
374 }
375}
376
377impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
378 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
379 let this = unsafe { self.get_unchecked_mut() };
380 this.slices = unsafe { this.buffer.io_slices_mut() };
381 opcode::Readv::new(
382 Fd(this.fd.as_fd().as_raw_fd()),
383 this.slices.as_ptr() as _,
384 this.slices.len() as _,
385 )
386 .build()
387 .into()
388 }
389}
390
391impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
392 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
393 let slice = self.buffer.as_slice();
394 opcode::Write::new(
395 Fd(self.fd.as_fd().as_raw_fd()),
396 slice.as_ptr(),
397 slice.len() as _,
398 )
399 .build()
400 .into()
401 }
402}
403
404impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
405 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
406 let this = unsafe { self.get_unchecked_mut() };
407 this.slices = unsafe { this.buffer.io_slices() };
408 opcode::Writev::new(
409 Fd(this.fd.as_fd().as_raw_fd()),
410 this.slices.as_ptr() as _,
411 this.slices.len() as _,
412 )
413 .build()
414 .into()
415 }
416}
417
418struct RecvFromHeader<S> {
419 pub(crate) fd: S,
420 pub(crate) addr: SockAddrStorage,
421 pub(crate) msg: libc::msghdr,
422 _p: PhantomPinned,
423}
424
425impl<S> RecvFromHeader<S> {
426 pub fn new(fd: S) -> Self {
427 Self {
428 fd,
429 addr: SockAddrStorage::zeroed(),
430 msg: unsafe { std::mem::zeroed() },
431 _p: PhantomPinned,
432 }
433 }
434}
435
436impl<S: AsFd> RecvFromHeader<S> {
437 pub fn create_entry(&mut self, slices: &mut [IoSliceMut]) -> OpEntry {
438 self.msg.msg_name = &mut self.addr as *mut _ as _;
439 self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
440 self.msg.msg_iov = slices.as_mut_ptr() as _;
441 self.msg.msg_iovlen = slices.len() as _;
442 opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
443 .build()
444 .into()
445 }
446
447 pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
448 (self.addr, self.msg.msg_namelen)
449 }
450}
451
452pub struct RecvFrom<T: IoBufMut, S> {
454 header: RecvFromHeader<S>,
455 buffer: T,
456 slice: [IoSliceMut; 1],
457}
458
459impl<T: IoBufMut, S> RecvFrom<T, S> {
460 pub fn new(fd: S, buffer: T) -> Self {
462 Self {
463 header: RecvFromHeader::new(fd),
464 buffer,
465 slice: [unsafe { IoSliceMut::from_slice(&mut []) }],
467 }
468 }
469}
470
471impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
472 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
473 let this = unsafe { self.get_unchecked_mut() };
474 this.slice[0] = unsafe { this.buffer.as_io_slice_mut() };
475 this.header.create_entry(&mut this.slice)
476 }
477}
478
479impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
480 type Inner = (T, SockAddrStorage, socklen_t);
481
482 fn into_inner(self) -> Self::Inner {
483 let (addr, addr_len) = self.header.into_addr();
484 (self.buffer, addr, addr_len)
485 }
486}
487
488pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
490 header: RecvFromHeader<S>,
491 buffer: T,
492 slice: Vec<IoSliceMut>,
493}
494
495impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
496 pub fn new(fd: S, buffer: T) -> Self {
498 Self {
499 header: RecvFromHeader::new(fd),
500 buffer,
501 slice: vec![],
502 }
503 }
504}
505
506impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
507 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
508 let this = unsafe { self.get_unchecked_mut() };
509 this.slice = unsafe { this.buffer.io_slices_mut() };
510 this.header.create_entry(&mut this.slice)
511 }
512}
513
514impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
515 type Inner = (T, SockAddrStorage, socklen_t);
516
517 fn into_inner(self) -> Self::Inner {
518 let (addr, addr_len) = self.header.into_addr();
519 (self.buffer, addr, addr_len)
520 }
521}
522
523struct SendToHeader<S> {
524 pub(crate) fd: S,
525 pub(crate) addr: SockAddr,
526 pub(crate) msg: libc::msghdr,
527 _p: PhantomPinned,
528}
529
530impl<S> SendToHeader<S> {
531 pub fn new(fd: S, addr: SockAddr) -> Self {
532 Self {
533 fd,
534 addr,
535 msg: unsafe { std::mem::zeroed() },
536 _p: PhantomPinned,
537 }
538 }
539}
540
541impl<S: AsFd> SendToHeader<S> {
542 pub fn create_entry(&mut self, slices: &mut [IoSlice]) -> OpEntry {
543 self.msg.msg_name = self.addr.as_ptr() as _;
544 self.msg.msg_namelen = self.addr.len();
545 self.msg.msg_iov = slices.as_mut_ptr() as _;
546 self.msg.msg_iovlen = slices.len() as _;
547 opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
548 .build()
549 .into()
550 }
551}
552
553pub struct SendTo<T: IoBuf, S> {
555 header: SendToHeader<S>,
556 buffer: T,
557 slice: [IoSlice; 1],
558}
559
560impl<T: IoBuf, S> SendTo<T, S> {
561 pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
563 Self {
564 header: SendToHeader::new(fd, addr),
565 buffer,
566 slice: [unsafe { IoSlice::from_slice(&[]) }],
568 }
569 }
570}
571
572impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
573 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
574 let this = unsafe { self.get_unchecked_mut() };
575 this.slice[0] = unsafe { this.buffer.as_io_slice() };
576 this.header.create_entry(&mut this.slice)
577 }
578}
579
580impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
581 type Inner = T;
582
583 fn into_inner(self) -> Self::Inner {
584 self.buffer
585 }
586}
587
588pub struct SendToVectored<T: IoVectoredBuf, S> {
590 header: SendToHeader<S>,
591 buffer: T,
592 slice: Vec<IoSlice>,
593}
594
595impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
596 pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
598 Self {
599 header: SendToHeader::new(fd, addr),
600 buffer,
601 slice: vec![],
602 }
603 }
604}
605
606impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
607 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
608 let this = unsafe { self.get_unchecked_mut() };
609 this.slice = unsafe { this.buffer.io_slices() };
610 this.header.create_entry(&mut this.slice)
611 }
612}
613
614impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
615 type Inner = T;
616
617 fn into_inner(self) -> Self::Inner {
618 self.buffer
619 }
620}
621
622impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
623 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
624 let this = unsafe { self.get_unchecked_mut() };
625 unsafe { this.set_msg() };
626 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &mut this.msg)
627 .build()
628 .into()
629 }
630}
631
632impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
633 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
634 let this = unsafe { self.get_unchecked_mut() };
635 unsafe { this.set_msg() };
636 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), &this.msg)
637 .build()
638 .into()
639 }
640}
641
642impl<S: AsFd> OpCode for PollOnce<S> {
643 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
644 let flags = match self.interest {
645 Interest::Readable => libc::POLLIN,
646 Interest::Writable => libc::POLLOUT,
647 };
648 opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
649 .build()
650 .into()
651 }
652}
653
654mod buf_ring {
655 use std::{
656 io,
657 marker::PhantomPinned,
658 os::fd::{AsFd, AsRawFd},
659 pin::Pin,
660 ptr,
661 };
662
663 use io_uring::{opcode, squeue::Flags, types::Fd};
664
665 use super::OpCode;
666 use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
667
668 #[derive(Debug)]
670 pub struct ReadManagedAt<S> {
671 pub(crate) fd: S,
672 pub(crate) offset: u64,
673 buffer_group: u16,
674 len: u32,
675 _p: PhantomPinned,
676 }
677
678 impl<S> ReadManagedAt<S> {
679 pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
681 #[cfg(fusion)]
682 let buffer_pool = buffer_pool.as_io_uring();
683 Ok(Self {
684 fd,
685 offset,
686 buffer_group: buffer_pool.buffer_group(),
687 len: len.try_into().map_err(|_| {
688 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
689 })?,
690 _p: PhantomPinned,
691 })
692 }
693 }
694
695 impl<S: AsFd> OpCode for ReadManagedAt<S> {
696 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
697 let fd = Fd(self.fd.as_fd().as_raw_fd());
698 let offset = self.offset;
699 opcode::Read::new(fd, ptr::null_mut(), self.len)
700 .offset(offset)
701 .buf_group(self.buffer_group)
702 .build()
703 .flags(Flags::BUFFER_SELECT)
704 .into()
705 }
706 }
707
708 impl<S> TakeBuffer for ReadManagedAt<S> {
709 type Buffer<'a> = BorrowedBuffer<'a>;
710 type BufferPool = BufferPool;
711
712 fn take_buffer(
713 self,
714 buffer_pool: &Self::BufferPool,
715 result: io::Result<usize>,
716 flags: u32,
717 ) -> io::Result<Self::Buffer<'_>> {
718 #[cfg(fusion)]
719 let buffer_pool = buffer_pool.as_io_uring();
720 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
721 let res = unsafe { buffer_pool.get_buffer(flags, result) };
723 #[cfg(fusion)]
724 let res = res.map(BorrowedBuffer::new_io_uring);
725 res
726 }
727 }
728
729 pub struct RecvManaged<S> {
731 fd: S,
732 buffer_group: u16,
733 len: u32,
734 _p: PhantomPinned,
735 }
736
737 impl<S> RecvManaged<S> {
738 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
740 #[cfg(fusion)]
741 let buffer_pool = buffer_pool.as_io_uring();
742 Ok(Self {
743 fd,
744 buffer_group: buffer_pool.buffer_group(),
745 len: len.try_into().map_err(|_| {
746 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
747 })?,
748 _p: PhantomPinned,
749 })
750 }
751 }
752
753 impl<S: AsFd> OpCode for RecvManaged<S> {
754 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
755 let fd = self.fd.as_fd().as_raw_fd();
756 opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
757 .buf_group(self.buffer_group)
758 .build()
759 .flags(Flags::BUFFER_SELECT)
760 .into()
761 }
762 }
763
764 impl<S> TakeBuffer for RecvManaged<S> {
765 type Buffer<'a> = BorrowedBuffer<'a>;
766 type BufferPool = BufferPool;
767
768 fn take_buffer(
769 self,
770 buffer_pool: &Self::BufferPool,
771 result: io::Result<usize>,
772 flags: u32,
773 ) -> io::Result<Self::Buffer<'_>> {
774 #[cfg(fusion)]
775 let buffer_pool = buffer_pool.as_io_uring();
776 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(flags))?;
777 let res = unsafe { buffer_pool.get_buffer(flags, result) };
779 #[cfg(fusion)]
780 let res = res.map(BorrowedBuffer::new_io_uring);
781 res
782 }
783 }
784}
785
786pub use buf_ring::{ReadManagedAt, RecvManaged};