1use std::{
2 ffi::CString,
3 io,
4 marker::PhantomPinned,
5 os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6 pin::Pin,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
10use io_uring::{
11 opcode,
12 types::{Fd, FsyncFlags},
13};
14use pin_project_lite::pin_project;
15use socket2::{SockAddr, SockAddrStorage, socklen_t};
16
17use super::OpCode;
18pub use crate::sys::unix_op::*;
19use crate::{OpEntry, op::*, sys_slice::*, syscall};
20
21unsafe impl<
22 D: std::marker::Send + 'static,
23 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
24> OpCode for Asyncify<F, D>
25{
26 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
27 OpEntry::Blocking
28 }
29
30 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
31 let this = self.project();
32 let f = this
33 .f
34 .take()
35 .expect("the operate method could only be called once");
36 let BufResult(res, data) = f();
37 *this.data = Some(data);
38 res
39 }
40}
41
42unsafe impl<
43 S,
44 D: std::marker::Send + 'static,
45 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
46> OpCode for AsyncifyFd<S, F, D>
47{
48 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
49 OpEntry::Blocking
50 }
51
52 fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
53 let this = self.project();
54 let f = this
55 .f
56 .take()
57 .expect("the operate method could only be called once");
58 let BufResult(res, data) = f(this.fd);
59 *this.data = Some(data);
60 res
61 }
62}
63
64unsafe impl OpCode for OpenFile {
65 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
66 opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
67 .flags(self.flags | libc::O_CLOEXEC)
68 .mode(self.mode)
69 .build()
70 .into()
71 }
72
73 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
74 self.call()
75 }
76}
77
78unsafe impl OpCode for CloseFile {
79 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
80 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
81 .build()
82 .into()
83 }
84
85 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
86 self.call()
87 }
88}
89
90unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
91 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
92 opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
93 .build()
94 .into()
95 }
96
97 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
98 self.call()
99 }
100}
101
102pin_project! {
103 pub struct FileStat<S> {
105 pub(crate) fd: S,
106 pub(crate) stat: Statx,
107 }
108}
109
110impl<S> FileStat<S> {
111 pub fn new(fd: S) -> Self {
113 Self {
114 fd,
115 stat: unsafe { std::mem::zeroed() },
116 }
117 }
118}
119
120unsafe impl<S: AsFd> OpCode for FileStat<S> {
121 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
122 let this = self.project();
123 static EMPTY_NAME: &[u8] = b"\0";
124 opcode::Statx::new(
125 Fd(this.fd.as_fd().as_fd().as_raw_fd()),
126 EMPTY_NAME.as_ptr().cast(),
127 this.stat as *mut _ as _,
128 )
129 .flags(libc::AT_EMPTY_PATH)
130 .mask(statx_mask())
131 .build()
132 .into()
133 }
134
135 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
136 let this = self.project();
137 static EMPTY_NAME: &[u8] = b"\0";
138 let res = syscall!(libc::statx(
139 this.fd.as_fd().as_raw_fd(),
140 EMPTY_NAME.as_ptr().cast(),
141 libc::AT_EMPTY_PATH,
142 statx_mask(),
143 this.stat as *mut _ as _
144 ))?;
145 Ok(res as _)
146 }
147}
148
149impl<S> IntoInner for FileStat<S> {
150 type Inner = Stat;
151
152 fn into_inner(self) -> Self::Inner {
153 statx_to_stat(self.stat)
154 }
155}
156
157pub struct PathStat {
159 pub(crate) path: CString,
160 pub(crate) stat: Statx,
161 pub(crate) follow_symlink: bool,
162}
163
164impl PathStat {
165 pub fn new(path: CString, follow_symlink: bool) -> Self {
167 Self {
168 path,
169 stat: unsafe { std::mem::zeroed() },
170 follow_symlink,
171 }
172 }
173}
174
175unsafe impl OpCode for PathStat {
176 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
177 let mut flags = libc::AT_EMPTY_PATH;
178 if !self.follow_symlink {
179 flags |= libc::AT_SYMLINK_NOFOLLOW;
180 }
181 opcode::Statx::new(
182 Fd(libc::AT_FDCWD),
183 self.path.as_ptr(),
184 std::ptr::addr_of_mut!(self.stat).cast(),
185 )
186 .flags(flags)
187 .mask(statx_mask())
188 .build()
189 .into()
190 }
191
192 fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
193 let mut flags = libc::AT_EMPTY_PATH;
194 if !self.follow_symlink {
195 flags |= libc::AT_SYMLINK_NOFOLLOW;
196 }
197 let res = syscall!(libc::statx(
198 libc::AT_FDCWD,
199 self.path.as_ptr(),
200 flags,
201 statx_mask(),
202 std::ptr::addr_of_mut!(self.stat).cast()
203 ))?;
204 Ok(res as _)
205 }
206}
207
208impl IntoInner for PathStat {
209 type Inner = Stat;
210
211 fn into_inner(self) -> Self::Inner {
212 statx_to_stat(self.stat)
213 }
214}
215
216unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
217 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
218 let this = self.project();
219 let fd = Fd(this.fd.as_fd().as_raw_fd());
220 let slice = this.buffer.sys_slice_mut();
221 opcode::Read::new(
222 fd,
223 slice.ptr() as _,
224 slice.len().try_into().unwrap_or(u32::MAX),
225 )
226 .offset(*this.offset)
227 .build()
228 .into()
229 }
230}
231
232unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
233 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
234 let this = self.project();
235 *this.slices = this.buffer.sys_slices_mut();
236 opcode::Readv::new(
237 Fd(this.fd.as_fd().as_raw_fd()),
238 this.slices.as_ptr() as _,
239 this.slices.len().try_into().unwrap_or(u32::MAX),
240 )
241 .offset(*this.offset)
242 .build()
243 .into()
244 }
245}
246
247unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
248 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
249 let slice = self.buffer.as_init();
250 opcode::Write::new(
251 Fd(self.fd.as_fd().as_raw_fd()),
252 slice.as_ptr(),
253 slice.len().try_into().unwrap_or(u32::MAX),
254 )
255 .offset(self.offset)
256 .build()
257 .into()
258 }
259}
260
261unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
262 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
263 let this = self.project();
264 *this.slices = this.buffer.as_ref().sys_slices();
265 opcode::Writev::new(
266 Fd(this.fd.as_fd().as_raw_fd()),
267 this.slices.as_ptr() as _,
268 this.slices.len().try_into().unwrap_or(u32::MAX),
269 )
270 .offset(*this.offset)
271 .build()
272 .into()
273 }
274}
275
276unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
277 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
278 let fd = self.fd.as_fd().as_raw_fd();
279 let slice = self.project().buffer.sys_slice_mut();
280 opcode::Read::new(
281 Fd(fd),
282 slice.ptr() as _,
283 slice.len().try_into().unwrap_or(u32::MAX),
284 )
285 .build()
286 .into()
287 }
288}
289
290unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
291 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
292 let this = self.project();
293 *this.slices = this.buffer.sys_slices_mut();
294 opcode::Readv::new(
295 Fd(this.fd.as_fd().as_raw_fd()),
296 this.slices.as_ptr() as _,
297 this.slices.len().try_into().unwrap_or(u32::MAX),
298 )
299 .build()
300 .into()
301 }
302}
303
304unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
305 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
306 let slice = self.buffer.as_init();
307 opcode::Write::new(
308 Fd(self.fd.as_fd().as_raw_fd()),
309 slice.as_ptr(),
310 slice.len().try_into().unwrap_or(u32::MAX),
311 )
312 .build()
313 .into()
314 }
315}
316
317unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
318 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
319 let this = self.project();
320 *this.slices = this.buffer.as_ref().sys_slices();
321 opcode::Writev::new(
322 Fd(this.fd.as_fd().as_raw_fd()),
323 this.slices.as_ptr() as _,
324 this.slices.len().try_into().unwrap_or(u32::MAX),
325 )
326 .build()
327 .into()
328 }
329}
330
331unsafe impl<S: AsFd> OpCode for Sync<S> {
332 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
333 opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
334 .flags(if self.datasync {
335 FsyncFlags::DATASYNC
336 } else {
337 FsyncFlags::empty()
338 })
339 .build()
340 .into()
341 }
342}
343
344unsafe impl OpCode for Unlink {
345 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
346 opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
347 .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
348 .build()
349 .into()
350 }
351
352 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
353 self.call()
354 }
355}
356
357unsafe impl OpCode for CreateDir {
358 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
359 opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
360 .mode(self.mode)
361 .build()
362 .into()
363 }
364
365 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
366 self.call()
367 }
368}
369
370unsafe impl OpCode for Rename {
371 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
372 opcode::RenameAt::new(
373 Fd(libc::AT_FDCWD),
374 self.old_path.as_ptr(),
375 Fd(libc::AT_FDCWD),
376 self.new_path.as_ptr(),
377 )
378 .build()
379 .into()
380 }
381
382 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
383 self.call()
384 }
385}
386
387unsafe impl OpCode for Symlink {
388 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
389 opcode::SymlinkAt::new(
390 Fd(libc::AT_FDCWD),
391 self.source.as_ptr(),
392 self.target.as_ptr(),
393 )
394 .build()
395 .into()
396 }
397
398 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
399 self.call()
400 }
401}
402
403unsafe impl OpCode for HardLink {
404 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
405 opcode::LinkAt::new(
406 Fd(libc::AT_FDCWD),
407 self.source.as_ptr(),
408 Fd(libc::AT_FDCWD),
409 self.target.as_ptr(),
410 )
411 .build()
412 .into()
413 }
414
415 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
416 self.call()
417 }
418}
419
420unsafe impl OpCode for CreateSocket {
421 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
422 opcode::Socket::new(
423 self.domain,
424 self.socket_type | libc::SOCK_CLOEXEC,
425 self.protocol,
426 )
427 .build()
428 .into()
429 }
430
431 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
432 Ok(syscall!(libc::socket(
433 self.domain,
434 self.socket_type | libc::SOCK_CLOEXEC,
435 self.protocol
436 ))? as _)
437 }
438}
439
440unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
441 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
442 opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
443 .build()
444 .into()
445 }
446
447 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
448 self.call()
449 }
450}
451
452unsafe impl OpCode for CloseSocket {
453 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
454 opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
455 .build()
456 .into()
457 }
458
459 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
460 self.call()
461 }
462}
463
464unsafe impl<S: AsFd> OpCode for Accept<S> {
465 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
466 let this = self.project();
467 opcode::Accept::new(
468 Fd(this.fd.as_fd().as_raw_fd()),
469 unsafe { this.buffer.view_as::<libc::sockaddr>() },
470 this.addr_len,
471 )
472 .flags(libc::SOCK_CLOEXEC)
473 .build()
474 .into()
475 }
476
477 unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
478 let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
480 *self.project().accepted_fd = Some(fd);
481 }
482}
483
484unsafe impl<S: AsFd> OpCode for Connect<S> {
485 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
486 opcode::Connect::new(
487 Fd(self.fd.as_fd().as_raw_fd()),
488 self.addr.as_ptr().cast(),
489 self.addr.len(),
490 )
491 .build()
492 .into()
493 }
494}
495
496unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
497 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
498 let fd = self.fd.as_fd().as_raw_fd();
499 let flags = self.flags;
500 let slice = self.project().buffer.sys_slice_mut();
501 opcode::Recv::new(
502 Fd(fd),
503 slice.ptr() as _,
504 slice.len().try_into().unwrap_or(u32::MAX),
505 )
506 .flags(flags)
507 .build()
508 .into()
509 }
510}
511
512unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
513 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
514 self.as_mut().set_msg();
515 let this = self.project();
516 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
517 .flags(*this.flags as _)
518 .build()
519 .into()
520 }
521}
522
523unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
524 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
525 let slice = self.buffer.as_init();
526 opcode::Send::new(
527 Fd(self.fd.as_fd().as_raw_fd()),
528 slice.as_ptr(),
529 slice.len().try_into().unwrap_or(u32::MAX),
530 )
531 .flags(self.flags)
532 .build()
533 .into()
534 }
535}
536
537unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
538 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
539 self.as_mut().set_msg();
540 let this = self.project();
541 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
542 .flags(*this.flags as _)
543 .build()
544 .into()
545 }
546}
547
548struct RecvFromHeader<S> {
549 pub(crate) fd: S,
550 pub(crate) addr: SockAddrStorage,
551 pub(crate) msg: libc::msghdr,
552 pub(crate) flags: i32,
553 _p: PhantomPinned,
554}
555
556impl<S> RecvFromHeader<S> {
557 pub fn new(fd: S, flags: i32) -> Self {
558 Self {
559 fd,
560 addr: SockAddrStorage::zeroed(),
561 msg: unsafe { std::mem::zeroed() },
562 flags,
563 _p: PhantomPinned,
564 }
565 }
566}
567
568impl<S: AsFd> RecvFromHeader<S> {
569 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
570 self.msg.msg_name = &mut self.addr as *mut _ as _;
571 self.msg.msg_namelen = self.addr.size_of() as _;
572 self.msg.msg_iov = slices.as_mut_ptr() as _;
573 self.msg.msg_iovlen = slices.len() as _;
574 opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
575 .flags(self.flags as _)
576 .build()
577 .into()
578 }
579
580 pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
581 (self.addr, self.msg.msg_namelen)
582 }
583}
584
585pin_project! {
586 pub struct RecvFrom<T: IoBufMut, S> {
588 header: RecvFromHeader<S>,
589 #[pin]
590 buffer: T,
591 slice: Option<SysSlice>,
592 }
593}
594
595impl<T: IoBufMut, S> RecvFrom<T, S> {
596 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
598 Self {
599 header: RecvFromHeader::new(fd, flags),
600 buffer,
601 slice: None,
602 }
603 }
604}
605
606unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
607 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
608 let this = self.project();
609 let slice = this.slice.insert(this.buffer.sys_slice_mut());
610 this.header.create_entry(std::slice::from_mut(slice))
611 }
612}
613
614impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
615 type Inner = (T, SockAddrStorage, socklen_t);
616
617 fn into_inner(self) -> Self::Inner {
618 let (addr, addr_len) = self.header.into_addr();
619 (self.buffer, addr, addr_len)
620 }
621}
622
623pin_project! {
624 pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
626 header: RecvFromHeader<S>,
627 #[pin]
628 buffer: T,
629 slice: Vec<SysSlice>,
630 }
631}
632
633impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
634 pub fn new(fd: S, buffer: T, flags: i32) -> Self {
636 Self {
637 header: RecvFromHeader::new(fd, flags),
638 buffer,
639 slice: vec![],
640 }
641 }
642}
643
644unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
645 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
646 let this = self.project();
647 *this.slice = this.buffer.sys_slices_mut();
648 this.header.create_entry(this.slice)
649 }
650}
651
652impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
653 type Inner = (T, SockAddrStorage, socklen_t);
654
655 fn into_inner(self) -> Self::Inner {
656 let (addr, addr_len) = self.header.into_addr();
657 (self.buffer, addr, addr_len)
658 }
659}
660
661struct SendToHeader<S> {
662 pub(crate) fd: S,
663 pub(crate) addr: SockAddr,
664 pub(crate) msg: libc::msghdr,
665 pub(crate) flags: i32,
666 _p: PhantomPinned,
667}
668
669impl<S> SendToHeader<S> {
670 pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
671 Self {
672 fd,
673 addr,
674 msg: unsafe { std::mem::zeroed() },
675 flags,
676 _p: PhantomPinned,
677 }
678 }
679}
680
681impl<S: AsFd> SendToHeader<S> {
682 pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
683 self.msg.msg_name = self.addr.as_ptr() as _;
684 self.msg.msg_namelen = self.addr.len();
685 self.msg.msg_iov = slices.as_mut_ptr() as _;
686 self.msg.msg_iovlen = slices.len() as _;
687 opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
688 .flags(self.flags as _)
689 .build()
690 .into()
691 }
692}
693
694pin_project! {
695 pub struct SendTo<T: IoBuf, S> {
697 header: SendToHeader<S>,
698 #[pin]
699 buffer: T,
700 slice: Option<SysSlice>,
701 }
702}
703
704impl<T: IoBuf, S> SendTo<T, S> {
705 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
707 Self {
708 header: SendToHeader::new(fd, addr, flags),
709 buffer,
710 slice: None,
711 }
712 }
713}
714
715unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
716 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
717 let this = self.project();
718 let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
719 this.header.create_entry(std::slice::from_mut(slice))
720 }
721}
722
723impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
724 type Inner = T;
725
726 fn into_inner(self) -> Self::Inner {
727 self.buffer
728 }
729}
730
731pin_project! {
732 pub struct SendToVectored<T: IoVectoredBuf, S> {
734 header: SendToHeader<S>,
735 #[pin]
736 buffer: T,
737 slice: Vec<SysSlice>,
738 }
739}
740
741impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
742 pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
744 Self {
745 header: SendToHeader::new(fd, addr, flags),
746 buffer,
747 slice: vec![],
748 }
749 }
750}
751
752unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
753 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
754 let this = self.project();
755 *this.slice = this.buffer.as_ref().sys_slices();
756 this.header.create_entry(this.slice)
757 }
758}
759
760impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
761 type Inner = T;
762
763 fn into_inner(self) -> Self::Inner {
764 self.buffer
765 }
766}
767
768unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
769 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
770 self.as_mut().set_msg();
771 let this = self.project();
772 opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
773 .flags(*this.flags as _)
774 .build()
775 .into()
776 }
777}
778
779unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
780 fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
781 self.as_mut().set_msg();
782 let this = self.project();
783 opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
784 .flags(*this.flags as _)
785 .build()
786 .into()
787 }
788}
789
790unsafe impl<S: AsFd> OpCode for PollOnce<S> {
791 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
792 let flags = match self.interest {
793 Interest::Readable => libc::POLLIN,
794 Interest::Writable => libc::POLLOUT,
795 };
796 opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
797 .build()
798 .into()
799 }
800}
801
802unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
803 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
804 opcode::Splice::new(
805 Fd(self.fd_in.as_fd().as_raw_fd()),
806 self.offset_in,
807 Fd(self.fd_out.as_fd().as_raw_fd()),
808 self.offset_out,
809 self.len.try_into().unwrap_or(u32::MAX),
810 )
811 .flags(self.flags)
812 .build()
813 .into()
814 }
815
816 fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
817 let mut offset_in = self.offset_in;
818 let mut offset_out = self.offset_out;
819 let offset_in_ptr = if offset_in < 0 {
820 std::ptr::null_mut()
821 } else {
822 &mut offset_in
823 };
824 let offset_out_ptr = if offset_out < 0 {
825 std::ptr::null_mut()
826 } else {
827 &mut offset_out
828 };
829 Ok(syscall!(libc::splice(
830 self.fd_in.as_fd().as_raw_fd(),
831 offset_in_ptr,
832 self.fd_out.as_fd().as_raw_fd(),
833 offset_out_ptr,
834 self.len,
835 self.flags,
836 ))? as _)
837 }
838}
839
840mod buf_ring {
841 use std::{
842 io,
843 marker::PhantomPinned,
844 os::fd::{AsFd, AsRawFd},
845 pin::Pin,
846 ptr,
847 };
848
849 use io_uring::{opcode, squeue::Flags, types::Fd};
850
851 use super::OpCode;
852 use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
853
854 #[derive(Debug)]
856 pub struct ReadManagedAt<S> {
857 pub(crate) fd: S,
858 pub(crate) offset: u64,
859 buffer_group: u16,
860 len: u32,
861 _p: PhantomPinned,
862 }
863
864 impl<S> ReadManagedAt<S> {
865 pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
867 #[cfg(fusion)]
868 let buffer_pool = buffer_pool.as_io_uring();
869 Ok(Self {
870 fd,
871 offset,
872 buffer_group: buffer_pool.buffer_group(),
873 len: len.try_into().map_err(|_| {
874 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
875 })?,
876 _p: PhantomPinned,
877 })
878 }
879 }
880
881 unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
882 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
883 let fd = Fd(self.fd.as_fd().as_raw_fd());
884 let offset = self.offset;
885 opcode::Read::new(fd, ptr::null_mut(), self.len)
886 .offset(offset)
887 .buf_group(self.buffer_group)
888 .build()
889 .flags(Flags::BUFFER_SELECT)
890 .into()
891 }
892 }
893
894 impl<S> TakeBuffer for ReadManagedAt<S> {
895 type Buffer<'a> = BorrowedBuffer<'a>;
896 type BufferPool = BufferPool;
897
898 fn take_buffer(
899 self,
900 buffer_pool: &Self::BufferPool,
901 result: io::Result<usize>,
902 buffer_id: u16,
903 ) -> io::Result<Self::Buffer<'_>> {
904 #[cfg(fusion)]
905 let buffer_pool = buffer_pool.as_io_uring();
906 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
907 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
909 #[cfg(fusion)]
910 let res = res.map(BorrowedBuffer::new_io_uring);
911 res
912 }
913 }
914
915 pub struct ReadManaged<S> {
917 fd: S,
918 buffer_group: u16,
919 len: u32,
920 _p: PhantomPinned,
921 }
922
923 impl<S> ReadManaged<S> {
924 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
926 #[cfg(fusion)]
927 let buffer_pool = buffer_pool.as_io_uring();
928 Ok(Self {
929 fd,
930 buffer_group: buffer_pool.buffer_group(),
931 len: len.try_into().map_err(|_| {
932 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
933 })?,
934 _p: PhantomPinned,
935 })
936 }
937 }
938
939 unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
940 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
941 let fd = self.fd.as_fd().as_raw_fd();
942 opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
943 .buf_group(self.buffer_group)
944 .build()
945 .flags(Flags::BUFFER_SELECT)
946 .into()
947 }
948 }
949
950 impl<S> TakeBuffer for ReadManaged<S> {
951 type Buffer<'a> = BorrowedBuffer<'a>;
952 type BufferPool = BufferPool;
953
954 fn take_buffer(
955 self,
956 buffer_pool: &Self::BufferPool,
957 result: io::Result<usize>,
958 buffer_id: u16,
959 ) -> io::Result<Self::Buffer<'_>> {
960 #[cfg(fusion)]
961 let buffer_pool = buffer_pool.as_io_uring();
962 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
963 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
965 #[cfg(fusion)]
966 let res = res.map(BorrowedBuffer::new_io_uring);
967 res
968 }
969 }
970
971 pub struct RecvManaged<S> {
973 fd: S,
974 buffer_group: u16,
975 len: u32,
976 flags: i32,
977 _p: PhantomPinned,
978 }
979
980 impl<S> RecvManaged<S> {
981 pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
983 #[cfg(fusion)]
984 let buffer_pool = buffer_pool.as_io_uring();
985 Ok(Self {
986 fd,
987 buffer_group: buffer_pool.buffer_group(),
988 len: len.try_into().map_err(|_| {
989 io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
990 })?,
991 flags,
992 _p: PhantomPinned,
993 })
994 }
995 }
996
997 unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
998 fn create_entry(self: Pin<&mut Self>) -> OpEntry {
999 let fd = self.fd.as_fd().as_raw_fd();
1000 opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
1001 .flags(self.flags)
1002 .buf_group(self.buffer_group)
1003 .build()
1004 .flags(Flags::BUFFER_SELECT)
1005 .into()
1006 }
1007 }
1008
1009 impl<S> TakeBuffer for RecvManaged<S> {
1010 type Buffer<'a> = BorrowedBuffer<'a>;
1011 type BufferPool = BufferPool;
1012
1013 fn take_buffer(
1014 self,
1015 buffer_pool: &Self::BufferPool,
1016 result: io::Result<usize>,
1017 buffer_id: u16,
1018 ) -> io::Result<Self::Buffer<'_>> {
1019 #[cfg(fusion)]
1020 let buffer_pool = buffer_pool.as_io_uring();
1021 let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1022 let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
1024 #[cfg(fusion)]
1025 let res = res.map(BorrowedBuffer::new_io_uring);
1026 res
1027 }
1028 }
1029}
1030
1031pub use buf_ring::{ReadManaged, ReadManagedAt, RecvManaged};