1use std::{
2 ffi::CString,
3 io,
4 marker::PhantomPinned,
5 os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6 pin::Pin,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
10use io_uring::{
11 opcode,
12 types::{Fd, FsyncFlags},
13};
14use pin_project_lite::pin_project;
15use socket2::{SockAddr, SockAddrStorage, socklen_t};
16
17use super::OpCode;
18pub use crate::sys::unix_op::*;
19use crate::{OpEntry, op::*, sys_slice::*, syscall};
20
21impl<
22 D: std::marker::Send + 'static,
23 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
24> OpCode for Asyncify<F, D>
25{
26 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
27 OpEntry::Blocking
28 }
29
30 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
31 let this = self.project();
32 let f = this
33 .f
34 .take()
35 .expect("the operate method could only be called once");
36 let BufResult(res, data) = f();
37 *this.data = Some(data);
38 res
39 }
40}
41
42impl<
43 S,
44 D: std::marker::Send + 'static,
45 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
46> OpCode for AsyncifyFd<S, F, D>
47{
48 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
49 OpEntry::Blocking
50 }
51
52 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
53 let this = self.project();
54 let f = this
55 .f
56 .take()
57 .expect("the operate method could only be called once");
58 let BufResult(res, data) = f(this.fd);
59 *this.data = Some(data);
60 res
61 }
62}
63
64impl OpCode for OpenFile {
65 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
66 opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
67 .flags(self.flags | libc::O_CLOEXEC)
68 .mode(self.mode)
69 .build()
70 .into()
71 }
72}
73
74impl OpCode for CloseFile {
75 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
76 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
77 .build()
78 .into()
79 }
80}
81
82impl<S: AsFd> OpCode for TruncateFile<S> {
83 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
84 if super::is_op_supported(opcode::Ftruncate::CODE) {
85 return opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
86 .build()
87 .into();
88 }
89
90 OpEntry::Blocking
91 }
92
93 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
94 self.truncate()
95 }
96}
97
98pin_project! {
99 pub struct FileStat<S> {
101 pub(crate) fd: S,
102 pub(crate) stat: Statx,
103 }
104}
105
106impl<S> FileStat<S> {
107 pub fn new(fd: S) -> Self {
109 Self {
110 fd,
111 stat: unsafe { std::mem::zeroed() },
112 }
113 }
114}
115
116impl<S: AsFd> OpCode for FileStat<S> {
117 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
118 let this = self.project();
119 static EMPTY_NAME: &[u8] = b"\0";
120 opcode::Statx::new(
121 Fd(this.fd.as_fd().as_fd().as_raw_fd()),
122 EMPTY_NAME.as_ptr().cast(),
123 this.stat as *mut _ as _,
124 )
125 .flags(libc::AT_EMPTY_PATH)
126 .mask(statx_mask())
127 .build()
128 .into()
129 }
130}
131
132impl<S> IntoInner for FileStat<S> {
133 type Inner = Stat;
134
135 fn into_inner(self) -> Self::Inner {
136 statx_to_stat(self.stat)
137 }
138}
139
140pub struct PathStat {
142 pub(crate) path: CString,
143 pub(crate) stat: Statx,
144 pub(crate) follow_symlink: bool,
145}
146
147impl PathStat {
148 pub fn new(path: CString, follow_symlink: bool) -> Self {
150 Self {
151 path,
152 stat: unsafe { std::mem::zeroed() },
153 follow_symlink,
154 }
155 }
156}
157
158impl OpCode for PathStat {
159 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
160 let mut flags = libc::AT_EMPTY_PATH;
161 if !self.follow_symlink {
162 flags |= libc::AT_SYMLINK_NOFOLLOW;
163 }
164 opcode::Statx::new(
165 Fd(libc::AT_FDCWD),
166 self.path.as_ptr(),
167 std::ptr::addr_of_mut!(self.stat).cast(),
168 )
169 .flags(flags)
170 .mask(statx_mask())
171 .build()
172 .into()
173 }
174}
175
176impl IntoInner for PathStat {
177 type Inner = Stat;
178
179 fn into_inner(self) -> Self::Inner {
180 statx_to_stat(self.stat)
181 }
182}
183
184impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
185 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
186 let this = self.project();
187 let fd = Fd(this.fd.as_fd().as_raw_fd());
188 let slice = this.buffer.sys_slice_mut();
189 opcode::Read::new(
190 fd,
191 slice.ptr() as _,
192 slice.len().try_into().unwrap_or(u32::MAX),
193 )
194 .offset(*this.offset)
195 .build()
196 .into()
197 }
198}
199
200impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
201 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
202 let this = self.project();
203 *this.slices = this.buffer.sys_slices_mut();
204 opcode::Readv::new(
205 Fd(this.fd.as_fd().as_raw_fd()),
206 this.slices.as_ptr() as _,
207 this.slices.len().try_into().unwrap_or(u32::MAX),
208 )
209 .offset(*this.offset)
210 .build()
211 .into()
212 }
213}
214
215impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
216 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
217 let slice = self.buffer.as_init();
218 opcode::Write::new(
219 Fd(self.fd.as_fd().as_raw_fd()),
220 slice.as_ptr(),
221 slice.len().try_into().unwrap_or(u32::MAX),
222 )
223 .offset(self.offset)
224 .build()
225 .into()
226 }
227}
228
229impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
230 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
231 let this = self.project();
232 *this.slices = this.buffer.as_ref().sys_slices();
233 opcode::Writev::new(
234 Fd(this.fd.as_fd().as_raw_fd()),
235 this.slices.as_ptr() as _,
236 this.slices.len().try_into().unwrap_or(u32::MAX),
237 )
238 .offset(*this.offset)
239 .build()
240 .into()
241 }
242}
243
244impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
245 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
246 let fd = self.fd.as_fd().as_raw_fd();
247 let slice = self.project().buffer.sys_slice_mut();
248 opcode::Read::new(
249 Fd(fd),
250 slice.ptr() as _,
251 slice.len().try_into().unwrap_or(u32::MAX),
252 )
253 .build()
254 .into()
255 }
256}
257
258impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
259 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
260 let this = self.project();
261 *this.slices = this.buffer.sys_slices_mut();
262 opcode::Readv::new(
263 Fd(this.fd.as_fd().as_raw_fd()),
264 this.slices.as_ptr() as _,
265 this.slices.len().try_into().unwrap_or(u32::MAX),
266 )
267 .build()
268 .into()
269 }
270}
271
272impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
273 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
274 let slice = self.buffer.as_init();
275 opcode::Write::new(
276 Fd(self.fd.as_fd().as_raw_fd()),
277 slice.as_ptr(),
278 slice.len().try_into().unwrap_or(u32::MAX),
279 )
280 .build()
281 .into()
282 }
283}
284
285impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
286 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
287 let this = self.project();
288 *this.slices = this.buffer.as_ref().sys_slices();
289 opcode::Writev::new(
290 Fd(this.fd.as_fd().as_raw_fd()),
291 this.slices.as_ptr() as _,
292 this.slices.len().try_into().unwrap_or(u32::MAX),
293 )
294 .build()
295 .into()
296 }
297}
298
299impl<S: AsFd> OpCode for Sync<S> {
300 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
301 opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
302 .flags(if self.datasync {
303 FsyncFlags::DATASYNC
304 } else {
305 FsyncFlags::empty()
306 })
307 .build()
308 .into()
309 }
310}
311
312impl OpCode for Unlink {
313 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
314 opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
315 .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
316 .build()
317 .into()
318 }
319}
320
321impl OpCode for CreateDir {
322 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
323 opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
324 .mode(self.mode)
325 .build()
326 .into()
327 }
328}
329
330impl OpCode for Rename {
331 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
332 opcode::RenameAt::new(
333 Fd(libc::AT_FDCWD),
334 self.old_path.as_ptr(),
335 Fd(libc::AT_FDCWD),
336 self.new_path.as_ptr(),
337 )
338 .build()
339 .into()
340 }
341}
342
343impl OpCode for Symlink {
344 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
345 opcode::SymlinkAt::new(
346 Fd(libc::AT_FDCWD),
347 self.source.as_ptr(),
348 self.target.as_ptr(),
349 )
350 .build()
351 .into()
352 }
353}
354
355impl OpCode for HardLink {
356 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
357 opcode::LinkAt::new(
358 Fd(libc::AT_FDCWD),
359 self.source.as_ptr(),
360 Fd(libc::AT_FDCWD),
361 self.target.as_ptr(),
362 )
363 .build()
364 .into()
365 }
366}
367
368impl OpCode for CreateSocket {
369 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
370 if super::is_op_supported(opcode::Socket::CODE) {
371 opcode::Socket::new(
372 self.domain,
373 self.socket_type | libc::SOCK_CLOEXEC,
374 self.protocol,
375 )
376 .build()
377 .into()
378 } else {
379 OpEntry::Blocking
380 }
381 }
382
383 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
384 Ok(syscall!(libc::socket(
385 self.domain,
386 self.socket_type | libc::SOCK_CLOEXEC,
387 self.protocol
388 ))? as _)
389 }
390}
391
392impl<S: AsFd> OpCode for ShutdownSocket<S> {
393 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
394 opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
395 .build()
396 .into()
397 }
398}
399
400impl OpCode for CloseSocket {
401 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
402 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
403 .build()
404 .into()
405 }
406}
407
408impl<S: AsFd> OpCode for Accept<S> {
409 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
410 let this = self.project();
411 opcode::Accept::new(
412 Fd(this.fd.as_fd().as_raw_fd()),
413 unsafe { this.buffer.view_as::<libc::sockaddr>() },
414 this.addr_len,
415 )
416 .flags(libc::SOCK_CLOEXEC)
417 .build()
418 .into()
419 }
420
421 unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
422 let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
424 *self.project().accepted_fd = Some(fd);
425 }
426}
427
428impl<S: AsFd> OpCode for Connect<S> {
429 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
430 opcode::Connect::new(
431 Fd(self.fd.as_fd().as_raw_fd()),
432 self.addr.as_ptr().cast(),
433 self.addr.len(),
434 )
435 .build()
436 .into()
437 }
438}
439
440impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
441 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
442 let fd = self.fd.as_fd().as_raw_fd();
443 let flags = self.flags;
444 let slice = self.project().buffer.sys_slice_mut();
445 opcode::Recv::new(
446 Fd(fd),
447 slice.ptr() as _,
448 slice.len().try_into().unwrap_or(u32::MAX),
449 )
450 .flags(flags)
451 .build()
452 .into()
453 }
454}
455
456impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
457 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
458 self.as_mut().set_msg();
459 let this = self.project();
460 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
461 .flags(*this.flags as _)
462 .build()
463 .into()
464 }
465}
466
467impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
468 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
469 let slice = self.buffer.as_init();
470 opcode::Send::new(
471 Fd(self.fd.as_fd().as_raw_fd()),
472 slice.as_ptr(),
473 slice.len().try_into().unwrap_or(u32::MAX),
474 )
475 .flags(self.flags)
476 .build()
477 .into()
478 }
479}
480
481impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
482 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
483 self.as_mut().set_msg();
484 let this = self.project();
485 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
486 .flags(*this.flags as _)
487 .build()
488 .into()
489 }
490}
491
492struct RecvFromHeader<S> {
493 pub(crate) fd: S,
494 pub(crate) addr: SockAddrStorage,
495 pub(crate) msg: libc::msghdr,
496 pub(crate) flags: i32,
497 _p: PhantomPinned,
498}
499
500impl<S> RecvFromHeader<S> {
501 pub fn new(fd: S, flags: i32) -> Self {
502 Self {
503 fd,
504 addr: SockAddrStorage::zeroed(),
505 msg: unsafe { std::mem::zeroed() },
506 flags,
507 _p: PhantomPinned,
508 }
509 }
510}
511
512impl<S: AsFd> RecvFromHeader<S> {
513 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
514 self.msg.msg_name = &mut self.addr as *mut _ as _;
515 self.msg.msg_namelen = self.addr.size_of() as _;
516 self.msg.msg_iov = slices.as_mut_ptr() as _;
517 self.msg.msg_iovlen = slices.len() as _;
518 opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
519 .flags(self.flags as _)
520 .build()
521 .into()
522 }
523
524 pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
525 (self.addr, self.msg.msg_namelen)
526 }
527}
528
529pin_project! {
530 pub struct RecvFrom<T: IoBufMut, S> {
532 header: RecvFromHeader<S>,
533 #[pin]
534 buffer: T,
535 slice: Option<SysSlice>,
536 }
537}
538
539impl<T: IoBufMut, S> RecvFrom<T, S> {
540 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
542 Self {
543 header: RecvFromHeader::new(fd, flags),
544 buffer,
545 slice: None,
546 }
547 }
548}
549
550impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
551 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
552 let this = self.project();
553 let slice = this.slice.insert(this.buffer.sys_slice_mut());
554 this.header.create_entry(std::slice::from_mut(slice))
555 }
556}
557
558impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
559 type Inner = (T, SockAddrStorage, socklen_t);
560
561 fn into_inner(self) -> Self::Inner {
562 let (addr, addr_len) = self.header.into_addr();
563 (self.buffer, addr, addr_len)
564 }
565}
566
567pin_project! {
568 pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
570 header: RecvFromHeader<S>,
571 #[pin]
572 buffer: T,
573 slice: Vec<SysSlice>,
574 }
575}
576
577impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
578 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
580 Self {
581 header: RecvFromHeader::new(fd, flags),
582 buffer,
583 slice: vec![],
584 }
585 }
586}
587
588impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
589 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
590 let this = self.project();
591 *this.slice = this.buffer.sys_slices_mut();
592 this.header.create_entry(this.slice)
593 }
594}
595
596impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
597 type Inner = (T, SockAddrStorage, socklen_t);
598
599 fn into_inner(self) -> Self::Inner {
600 let (addr, addr_len) = self.header.into_addr();
601 (self.buffer, addr, addr_len)
602 }
603}
604
605struct SendToHeader<S> {
606 pub(crate) fd: S,
607 pub(crate) addr: SockAddr,
608 pub(crate) msg: libc::msghdr,
609 pub(crate) flags: i32,
610 _p: PhantomPinned,
611}
612
613impl<S> SendToHeader<S> {
614 pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
615 Self {
616 fd,
617 addr,
618 msg: unsafe { std::mem::zeroed() },
619 flags,
620 _p: PhantomPinned,
621 }
622 }
623}
624
625impl<S: AsFd> SendToHeader<S> {
626 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
627 self.msg.msg_name = self.addr.as_ptr() as _;
628 self.msg.msg_namelen = self.addr.len();
629 self.msg.msg_iov = slices.as_mut_ptr() as _;
630 self.msg.msg_iovlen = slices.len() as _;
631 opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
632 .flags(self.flags as _)
633 .build()
634 .into()
635 }
636}
637
638pin_project! {
639 pub struct SendTo<T: IoBuf, S> {
641 header: SendToHeader<S>,
642 #[pin]
643 buffer: T,
644 slice: Option<SysSlice>,
645 }
646}
647
648impl<T: IoBuf, S> SendTo<T, S> {
649 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
651 Self {
652 header: SendToHeader::new(fd, addr, flags),
653 buffer,
654 slice: None,
655 }
656 }
657}
658
659impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
660 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
661 let this = self.project();
662 let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
663 this.header.create_entry(std::slice::from_mut(slice))
664 }
665}
666
667impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
668 type Inner = T;
669
670 fn into_inner(self) -> Self::Inner {
671 self.buffer
672 }
673}
674
675pin_project! {
676 pub struct SendToVectored<T: IoVectoredBuf, S> {
678 header: SendToHeader<S>,
679 #[pin]
680 buffer: T,
681 slice: Vec<SysSlice>,
682 }
683}
684
685impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
686 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
688 Self {
689 header: SendToHeader::new(fd, addr, flags),
690 buffer,
691 slice: vec![],
692 }
693 }
694}
695
696impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
697 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
698 let this = self.project();
699 *this.slice = this.buffer.as_ref().sys_slices();
700 this.header.create_entry(this.slice)
701 }
702}
703
704impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
705 type Inner = T;
706
707 fn into_inner(self) -> Self::Inner {
708 self.buffer
709 }
710}
711
712impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
713 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
714 self.as_mut().set_msg();
715 let this = self.project();
716 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
717 .flags(*this.flags as _)
718 .build()
719 .into()
720 }
721}
722
723impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
724 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
725 self.as_mut().set_msg();
726 let this = self.project();
727 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
728 .flags(*this.flags as _)
729 .build()
730 .into()
731 }
732}
733
734impl<S: AsFd> OpCode for PollOnce<S> {
735 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
736 let flags = match self.interest {
737 Interest::Readable => libc::POLLIN,
738 Interest::Writable => libc::POLLOUT,
739 };
740 opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
741 .build()
742 .into()
743 }
744}
745
746impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
747 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
748 opcode::Splice::new(
749 Fd(self.fd_in.as_fd().as_raw_fd()),
750 self.offset_in,
751 Fd(self.fd_out.as_fd().as_raw_fd()),
752 self.offset_out,
753 self.len.try_into().unwrap_or(u32::MAX),
754 )
755 .flags(self.flags)
756 .build()
757 .into()
758 }
759}
760
761mod buf_ring {
762 use std::{
763 io,
764 marker::PhantomPinned,
765 os::fd::{AsFd, AsRawFd},
766 pin::Pin,
767 ptr,
768 };
769
770 use io_uring::{opcode, squeue::Flags, types::Fd};
771
772 use super::OpCode;
773 use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
774
775 #[derive(Debug)]
777 pub struct ReadManagedAt<S> {
778 pub(crate) fd: S,
779 pub(crate) offset: u64,
780 buffer_group: u16,
781 len: u32,
782 _p: PhantomPinned,
783 }
784
785 impl<S> ReadManagedAt<S> {
786 pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
788 #[cfg(fusion)]
789 let buffer_pool = buffer_pool.as_io_uring();
790 Ok(Self {
791 fd,
792 offset,
793 buffer_group: buffer_pool.buffer_group(),
794 len: len.try_into().map_err(|_| {
795 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
796 })?,
797 _p: PhantomPinned,
798 })
799 }
800 }
801
802 impl<S: AsFd> OpCode for ReadManagedAt<S> {
803 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
804 let fd = Fd(self.fd.as_fd().as_raw_fd());
805 let offset = self.offset;
806 opcode::Read::new(fd, ptr::null_mut(), self.len)
807 .offset(offset)
808 .buf_group(self.buffer_group)
809 .build()
810 .flags(Flags::BUFFER_SELECT)
811 .into()
812 }
813 }
814
815 impl<S> TakeBuffer for ReadManagedAt<S> {
816 type Buffer<'a> = BorrowedBuffer<'a>;
817 type BufferPool = BufferPool;
818
819 fn take_buffer(
820 self,
821 buffer_pool: &Self::BufferPool,
822 result: io::Result<usize>,
823 buffer_id: u16,
824 ) -> io::Result<Self::Buffer<'_>> {
825 #[cfg(fusion)]
826 let buffer_pool = buffer_pool.as_io_uring();
827 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
828 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
830 #[cfg(fusion)]
831 let res = res.map(BorrowedBuffer::new_io_uring);
832 res
833 }
834 }
835
836 pub struct ReadManaged<S> {
838 fd: S,
839 buffer_group: u16,
840 len: u32,
841 _p: PhantomPinned,
842 }
843
844 impl<S> ReadManaged<S> {
845 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
847 #[cfg(fusion)]
848 let buffer_pool = buffer_pool.as_io_uring();
849 Ok(Self {
850 fd,
851 buffer_group: buffer_pool.buffer_group(),
852 len: len.try_into().map_err(|_| {
853 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
854 })?,
855 _p: PhantomPinned,
856 })
857 }
858 }
859
860 impl<S: AsFd> OpCode for ReadManaged<S> {
861 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
862 let fd = self.fd.as_fd().as_raw_fd();
863 opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
864 .buf_group(self.buffer_group)
865 .build()
866 .flags(Flags::BUFFER_SELECT)
867 .into()
868 }
869 }
870
871 impl<S> TakeBuffer for ReadManaged<S> {
872 type Buffer<'a> = BorrowedBuffer<'a>;
873 type BufferPool = BufferPool;
874
875 fn take_buffer(
876 self,
877 buffer_pool: &Self::BufferPool,
878 result: io::Result<usize>,
879 buffer_id: u16,
880 ) -> io::Result<Self::Buffer<'_>> {
881 #[cfg(fusion)]
882 let buffer_pool = buffer_pool.as_io_uring();
883 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
884 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
886 #[cfg(fusion)]
887 let res = res.map(BorrowedBuffer::new_io_uring);
888 res
889 }
890 }
891
892 pub struct RecvManaged<S> {
894 fd: S,
895 buffer_group: u16,
896 len: u32,
897 flags: i32,
898 _p: PhantomPinned,
899 }
900
901 impl<S> RecvManaged<S> {
902 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
904 #[cfg(fusion)]
905 let buffer_pool = buffer_pool.as_io_uring();
906 Ok(Self {
907 fd,
908 buffer_group: buffer_pool.buffer_group(),
909 len: len.try_into().map_err(|_| {
910 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
911 })?,
912 flags,
913 _p: PhantomPinned,
914 })
915 }
916 }
917
918 impl<S: AsFd> OpCode for RecvManaged<S> {
919 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
920 let fd = self.fd.as_fd().as_raw_fd();
921 opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
922 .flags(self.flags)
923 .buf_group(self.buffer_group)
924 .build()
925 .flags(Flags::BUFFER_SELECT)
926 .into()
927 }
928 }
929
930 impl<S> TakeBuffer for RecvManaged<S> {
931 type Buffer<'a> = BorrowedBuffer<'a>;
932 type BufferPool = BufferPool;
933
934 fn take_buffer(
935 self,
936 buffer_pool: &Self::BufferPool,
937 result: io::Result<usize>,
938 buffer_id: u16,
939 ) -> io::Result<Self::Buffer<'_>> {
940 #[cfg(fusion)]
941 let buffer_pool = buffer_pool.as_io_uring();
942 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
943 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
945 #[cfg(fusion)]
946 let res = res.map(BorrowedBuffer::new_io_uring);
947 res
948 }
949 }
950}
951
952pub use buf_ring::{ReadManaged, ReadManagedAt, RecvManaged};