Skip to main content

compio_driver/sys/iour/
op.rs

1use std::{
2    ffi::CString,
3    io,
4    marker::PhantomPinned,
5    os::fd::{AsFd, AsRawFd, FromRawFd, OwnedFd},
6    pin::Pin,
7};
8
9use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
10use io_uring::{
11    opcode,
12    types::{Fd, FsyncFlags},
13};
14use pin_project_lite::pin_project;
15use socket2::{SockAddr, SockAddrStorage, socklen_t};
16
17use super::OpCode;
18pub use crate::sys::unix_op::*;
19use crate::{OpEntry, op::*, sys_slice::*, syscall};
20
21unsafe impl<
22    D: std::marker::Send + 'static,
23    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
24> OpCode for Asyncify<F, D>
25{
26    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
27        OpEntry::Blocking
28    }
29
30    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
31        let this = self.project();
32        let f = this
33            .f
34            .take()
35            .expect("the operate method could only be called once");
36        let BufResult(res, data) = f();
37        *this.data = Some(data);
38        res
39    }
40}
41
42unsafe impl<
43    S,
44    D: std::marker::Send + 'static,
45    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
46> OpCode for AsyncifyFd<S, F, D>
47{
48    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
49        OpEntry::Blocking
50    }
51
52    fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
53        let this = self.project();
54        let f = this
55            .f
56            .take()
57            .expect("the operate method could only be called once");
58        let BufResult(res, data) = f(this.fd);
59        *this.data = Some(data);
60        res
61    }
62}
63
64unsafe impl OpCode for OpenFile {
65    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
66        opcode::OpenAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
67            .flags(self.flags | libc::O_CLOEXEC)
68            .mode(self.mode)
69            .build()
70            .into()
71    }
72
73    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
74        self.call()
75    }
76}
77
78unsafe impl OpCode for CloseFile {
79    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
80        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
81            .build()
82            .into()
83    }
84
85    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
86        self.call()
87    }
88}
89
90unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
91    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
92        opcode::Ftruncate::new(Fd(self.fd.as_fd().as_raw_fd()), self.size)
93            .build()
94            .into()
95    }
96
97    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
98        self.call()
99    }
100}
101
102pin_project! {
103    /// Get metadata of an opened file.
104    pub struct FileStat<S> {
105        pub(crate) fd: S,
106        pub(crate) stat: Statx,
107    }
108}
109
110impl<S> FileStat<S> {
111    /// Create [`FileStat`].
112    pub fn new(fd: S) -> Self {
113        Self {
114            fd,
115            stat: unsafe { std::mem::zeroed() },
116        }
117    }
118}
119
120unsafe impl<S: AsFd> OpCode for FileStat<S> {
121    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
122        let this = self.project();
123        static EMPTY_NAME: &[u8] = b"\0";
124        opcode::Statx::new(
125            Fd(this.fd.as_fd().as_fd().as_raw_fd()),
126            EMPTY_NAME.as_ptr().cast(),
127            this.stat as *mut _ as _,
128        )
129        .flags(libc::AT_EMPTY_PATH)
130        .mask(statx_mask())
131        .build()
132        .into()
133    }
134
135    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
136        let this = self.project();
137        static EMPTY_NAME: &[u8] = b"\0";
138        let res = syscall!(libc::statx(
139            this.fd.as_fd().as_raw_fd(),
140            EMPTY_NAME.as_ptr().cast(),
141            libc::AT_EMPTY_PATH,
142            statx_mask(),
143            this.stat as *mut _ as _
144        ))?;
145        Ok(res as _)
146    }
147}
148
149impl<S> IntoInner for FileStat<S> {
150    type Inner = Stat;
151
152    fn into_inner(self) -> Self::Inner {
153        statx_to_stat(self.stat)
154    }
155}
156
157/// Get metadata from path.
158pub struct PathStat {
159    pub(crate) path: CString,
160    pub(crate) stat: Statx,
161    pub(crate) follow_symlink: bool,
162}
163
164impl PathStat {
165    /// Create [`PathStat`].
166    pub fn new(path: CString, follow_symlink: bool) -> Self {
167        Self {
168            path,
169            stat: unsafe { std::mem::zeroed() },
170            follow_symlink,
171        }
172    }
173}
174
175unsafe impl OpCode for PathStat {
176    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
177        let mut flags = libc::AT_EMPTY_PATH;
178        if !self.follow_symlink {
179            flags |= libc::AT_SYMLINK_NOFOLLOW;
180        }
181        opcode::Statx::new(
182            Fd(libc::AT_FDCWD),
183            self.path.as_ptr(),
184            std::ptr::addr_of_mut!(self.stat).cast(),
185        )
186        .flags(flags)
187        .mask(statx_mask())
188        .build()
189        .into()
190    }
191
192    fn call_blocking(mut self: Pin<&mut Self>) -> io::Result<usize> {
193        let mut flags = libc::AT_EMPTY_PATH;
194        if !self.follow_symlink {
195            flags |= libc::AT_SYMLINK_NOFOLLOW;
196        }
197        let res = syscall!(libc::statx(
198            libc::AT_FDCWD,
199            self.path.as_ptr(),
200            flags,
201            statx_mask(),
202            std::ptr::addr_of_mut!(self.stat).cast()
203        ))?;
204        Ok(res as _)
205    }
206}
207
208impl IntoInner for PathStat {
209    type Inner = Stat;
210
211    fn into_inner(self) -> Self::Inner {
212        statx_to_stat(self.stat)
213    }
214}
215
216unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
217    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
218        let this = self.project();
219        let fd = Fd(this.fd.as_fd().as_raw_fd());
220        let slice = this.buffer.sys_slice_mut();
221        opcode::Read::new(
222            fd,
223            slice.ptr() as _,
224            slice.len().try_into().unwrap_or(u32::MAX),
225        )
226        .offset(*this.offset)
227        .build()
228        .into()
229    }
230}
231
232unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
233    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
234        let this = self.project();
235        *this.slices = this.buffer.sys_slices_mut();
236        opcode::Readv::new(
237            Fd(this.fd.as_fd().as_raw_fd()),
238            this.slices.as_ptr() as _,
239            this.slices.len().try_into().unwrap_or(u32::MAX),
240        )
241        .offset(*this.offset)
242        .build()
243        .into()
244    }
245}
246
247unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
248    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
249        let slice = self.buffer.as_init();
250        opcode::Write::new(
251            Fd(self.fd.as_fd().as_raw_fd()),
252            slice.as_ptr(),
253            slice.len().try_into().unwrap_or(u32::MAX),
254        )
255        .offset(self.offset)
256        .build()
257        .into()
258    }
259}
260
261unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
262    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
263        let this = self.project();
264        *this.slices = this.buffer.as_ref().sys_slices();
265        opcode::Writev::new(
266            Fd(this.fd.as_fd().as_raw_fd()),
267            this.slices.as_ptr() as _,
268            this.slices.len().try_into().unwrap_or(u32::MAX),
269        )
270        .offset(*this.offset)
271        .build()
272        .into()
273    }
274}
275
276unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
277    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
278        let fd = self.fd.as_fd().as_raw_fd();
279        let slice = self.project().buffer.sys_slice_mut();
280        opcode::Read::new(
281            Fd(fd),
282            slice.ptr() as _,
283            slice.len().try_into().unwrap_or(u32::MAX),
284        )
285        .build()
286        .into()
287    }
288}
289
290unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
291    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
292        let this = self.project();
293        *this.slices = this.buffer.sys_slices_mut();
294        opcode::Readv::new(
295            Fd(this.fd.as_fd().as_raw_fd()),
296            this.slices.as_ptr() as _,
297            this.slices.len().try_into().unwrap_or(u32::MAX),
298        )
299        .build()
300        .into()
301    }
302}
303
304unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
305    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
306        let slice = self.buffer.as_init();
307        opcode::Write::new(
308            Fd(self.fd.as_fd().as_raw_fd()),
309            slice.as_ptr(),
310            slice.len().try_into().unwrap_or(u32::MAX),
311        )
312        .build()
313        .into()
314    }
315}
316
317unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
318    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
319        let this = self.project();
320        *this.slices = this.buffer.as_ref().sys_slices();
321        opcode::Writev::new(
322            Fd(this.fd.as_fd().as_raw_fd()),
323            this.slices.as_ptr() as _,
324            this.slices.len().try_into().unwrap_or(u32::MAX),
325        )
326        .build()
327        .into()
328    }
329}
330
331unsafe impl<S: AsFd> OpCode for Sync<S> {
332    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
333        opcode::Fsync::new(Fd(self.fd.as_fd().as_raw_fd()))
334            .flags(if self.datasync {
335                FsyncFlags::DATASYNC
336            } else {
337                FsyncFlags::empty()
338            })
339            .build()
340            .into()
341    }
342}
343
344unsafe impl OpCode for Unlink {
345    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
346        opcode::UnlinkAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
347            .flags(if self.dir { libc::AT_REMOVEDIR } else { 0 })
348            .build()
349            .into()
350    }
351
352    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
353        self.call()
354    }
355}
356
357unsafe impl OpCode for CreateDir {
358    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
359        opcode::MkDirAt::new(Fd(libc::AT_FDCWD), self.path.as_ptr())
360            .mode(self.mode)
361            .build()
362            .into()
363    }
364
365    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
366        self.call()
367    }
368}
369
370unsafe impl OpCode for Rename {
371    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
372        opcode::RenameAt::new(
373            Fd(libc::AT_FDCWD),
374            self.old_path.as_ptr(),
375            Fd(libc::AT_FDCWD),
376            self.new_path.as_ptr(),
377        )
378        .build()
379        .into()
380    }
381
382    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
383        self.call()
384    }
385}
386
387unsafe impl OpCode for Symlink {
388    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
389        opcode::SymlinkAt::new(
390            Fd(libc::AT_FDCWD),
391            self.source.as_ptr(),
392            self.target.as_ptr(),
393        )
394        .build()
395        .into()
396    }
397
398    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
399        self.call()
400    }
401}
402
403unsafe impl OpCode for HardLink {
404    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
405        opcode::LinkAt::new(
406            Fd(libc::AT_FDCWD),
407            self.source.as_ptr(),
408            Fd(libc::AT_FDCWD),
409            self.target.as_ptr(),
410        )
411        .build()
412        .into()
413    }
414
415    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
416        self.call()
417    }
418}
419
420unsafe impl OpCode for CreateSocket {
421    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
422        opcode::Socket::new(
423            self.domain,
424            self.socket_type | libc::SOCK_CLOEXEC,
425            self.protocol,
426        )
427        .build()
428        .into()
429    }
430
431    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
432        Ok(syscall!(libc::socket(
433            self.domain,
434            self.socket_type | libc::SOCK_CLOEXEC,
435            self.protocol
436        ))? as _)
437    }
438}
439
440unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
441    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
442        opcode::Shutdown::new(Fd(self.fd.as_fd().as_raw_fd()), self.how())
443            .build()
444            .into()
445    }
446
447    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
448        self.call()
449    }
450}
451
452unsafe impl OpCode for CloseSocket {
453    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
454        opcode::Close::new(Fd(self.fd.as_fd().as_raw_fd()))
455            .build()
456            .into()
457    }
458
459    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
460        self.call()
461    }
462}
463
464unsafe impl<S: AsFd> OpCode for Accept<S> {
465    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
466        let this = self.project();
467        opcode::Accept::new(
468            Fd(this.fd.as_fd().as_raw_fd()),
469            unsafe { this.buffer.view_as::<libc::sockaddr>() },
470            this.addr_len,
471        )
472        .flags(libc::SOCK_CLOEXEC)
473        .build()
474        .into()
475    }
476
477    unsafe fn set_result(self: Pin<&mut Self>, fd: usize) {
478        // SAFETY: fd is a valid fd returned from kernel
479        let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
480        *self.project().accepted_fd = Some(fd);
481    }
482}
483
484unsafe impl<S: AsFd> OpCode for Connect<S> {
485    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
486        opcode::Connect::new(
487            Fd(self.fd.as_fd().as_raw_fd()),
488            self.addr.as_ptr().cast(),
489            self.addr.len(),
490        )
491        .build()
492        .into()
493    }
494}
495
496unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
497    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
498        let fd = self.fd.as_fd().as_raw_fd();
499        let flags = self.flags;
500        let slice = self.project().buffer.sys_slice_mut();
501        opcode::Recv::new(
502            Fd(fd),
503            slice.ptr() as _,
504            slice.len().try_into().unwrap_or(u32::MAX),
505        )
506        .flags(flags)
507        .build()
508        .into()
509    }
510}
511
512unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
513    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
514        self.as_mut().set_msg();
515        let this = self.project();
516        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
517            .flags(*this.flags as _)
518            .build()
519            .into()
520    }
521}
522
523unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
524    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
525        let slice = self.buffer.as_init();
526        opcode::Send::new(
527            Fd(self.fd.as_fd().as_raw_fd()),
528            slice.as_ptr(),
529            slice.len().try_into().unwrap_or(u32::MAX),
530        )
531        .flags(self.flags)
532        .build()
533        .into()
534    }
535}
536
537unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
538    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
539        self.as_mut().set_msg();
540        let this = self.project();
541        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
542            .flags(*this.flags as _)
543            .build()
544            .into()
545    }
546}
547
548struct RecvFromHeader<S> {
549    pub(crate) fd: S,
550    pub(crate) addr: SockAddrStorage,
551    pub(crate) msg: libc::msghdr,
552    pub(crate) flags: i32,
553    _p: PhantomPinned,
554}
555
556impl<S> RecvFromHeader<S> {
557    pub fn new(fd: S, flags: i32) -> Self {
558        Self {
559            fd,
560            addr: SockAddrStorage::zeroed(),
561            msg: unsafe { std::mem::zeroed() },
562            flags,
563            _p: PhantomPinned,
564        }
565    }
566}
567
568impl<S: AsFd> RecvFromHeader<S> {
569    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
570        self.msg.msg_name = &mut self.addr as *mut _ as _;
571        self.msg.msg_namelen = self.addr.size_of() as _;
572        self.msg.msg_iov = slices.as_mut_ptr() as _;
573        self.msg.msg_iovlen = slices.len() as _;
574        opcode::RecvMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &mut self.msg)
575            .flags(self.flags as _)
576            .build()
577            .into()
578    }
579
580    pub fn into_addr(self) -> (SockAddrStorage, socklen_t) {
581        (self.addr, self.msg.msg_namelen)
582    }
583}
584
585pin_project! {
586    /// Receive data and source address.
587    pub struct RecvFrom<T: IoBufMut, S> {
588        header: RecvFromHeader<S>,
589        #[pin]
590        buffer: T,
591        slice: Option<SysSlice>,
592    }
593}
594
595impl<T: IoBufMut, S> RecvFrom<T, S> {
596    /// Create [`RecvFrom`].
597    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
598        Self {
599            header: RecvFromHeader::new(fd, flags),
600            buffer,
601            slice: None,
602        }
603    }
604}
605
606unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
607    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
608        let this = self.project();
609        let slice = this.slice.insert(this.buffer.sys_slice_mut());
610        this.header.create_entry(std::slice::from_mut(slice))
611    }
612}
613
614impl<T: IoBufMut, S: AsFd> IntoInner for RecvFrom<T, S> {
615    type Inner = (T, SockAddrStorage, socklen_t);
616
617    fn into_inner(self) -> Self::Inner {
618        let (addr, addr_len) = self.header.into_addr();
619        (self.buffer, addr, addr_len)
620    }
621}
622
623pin_project! {
624    /// Receive data and source address into vectored buffer.
625    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
626        header: RecvFromHeader<S>,
627        #[pin]
628        buffer: T,
629        slice: Vec<SysSlice>,
630    }
631}
632
633impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
634    /// Create [`RecvFromVectored`].
635    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
636        Self {
637            header: RecvFromHeader::new(fd, flags),
638            buffer,
639            slice: vec![],
640        }
641    }
642}
643
644unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
645    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
646        let this = self.project();
647        *this.slice = this.buffer.sys_slices_mut();
648        this.header.create_entry(this.slice)
649    }
650}
651
652impl<T: IoVectoredBufMut, S: AsFd> IntoInner for RecvFromVectored<T, S> {
653    type Inner = (T, SockAddrStorage, socklen_t);
654
655    fn into_inner(self) -> Self::Inner {
656        let (addr, addr_len) = self.header.into_addr();
657        (self.buffer, addr, addr_len)
658    }
659}
660
661struct SendToHeader<S> {
662    pub(crate) fd: S,
663    pub(crate) addr: SockAddr,
664    pub(crate) msg: libc::msghdr,
665    pub(crate) flags: i32,
666    _p: PhantomPinned,
667}
668
669impl<S> SendToHeader<S> {
670    pub fn new(fd: S, addr: SockAddr, flags: i32) -> Self {
671        Self {
672            fd,
673            addr,
674            msg: unsafe { std::mem::zeroed() },
675            flags,
676            _p: PhantomPinned,
677        }
678    }
679}
680
681impl<S: AsFd> SendToHeader<S> {
682    pub fn create_entry(&mut self, slices: &mut [SysSlice]) -> OpEntry {
683        self.msg.msg_name = self.addr.as_ptr() as _;
684        self.msg.msg_namelen = self.addr.len();
685        self.msg.msg_iov = slices.as_mut_ptr() as _;
686        self.msg.msg_iovlen = slices.len() as _;
687        opcode::SendMsg::new(Fd(self.fd.as_fd().as_raw_fd()), &self.msg)
688            .flags(self.flags as _)
689            .build()
690            .into()
691    }
692}
693
694pin_project! {
695    /// Send data to specified address.
696    pub struct SendTo<T: IoBuf, S> {
697        header: SendToHeader<S>,
698        #[pin]
699        buffer: T,
700        slice: Option<SysSlice>,
701    }
702}
703
704impl<T: IoBuf, S> SendTo<T, S> {
705    /// Create [`SendTo`].
706    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
707        Self {
708            header: SendToHeader::new(fd, addr, flags),
709            buffer,
710            slice: None,
711        }
712    }
713}
714
715unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
716    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
717        let this = self.project();
718        let slice = this.slice.insert(this.buffer.as_ref().sys_slice());
719        this.header.create_entry(std::slice::from_mut(slice))
720    }
721}
722
723impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
724    type Inner = T;
725
726    fn into_inner(self) -> Self::Inner {
727        self.buffer
728    }
729}
730
731pin_project! {
732    /// Send data to specified address from vectored buffer.
733    pub struct SendToVectored<T: IoVectoredBuf, S> {
734        header: SendToHeader<S>,
735        #[pin]
736        buffer: T,
737        slice: Vec<SysSlice>,
738    }
739}
740
741impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
742    /// Create [`SendToVectored`].
743    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
744        Self {
745            header: SendToHeader::new(fd, addr, flags),
746            buffer,
747            slice: vec![],
748        }
749    }
750}
751
752unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
753    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
754        let this = self.project();
755        *this.slice = this.buffer.as_ref().sys_slices();
756        this.header.create_entry(this.slice)
757    }
758}
759
760impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
761    type Inner = T;
762
763    fn into_inner(self) -> Self::Inner {
764        self.buffer
765    }
766}
767
768unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
769    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
770        self.as_mut().set_msg();
771        let this = self.project();
772        opcode::RecvMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
773            .flags(*this.flags as _)
774            .build()
775            .into()
776    }
777}
778
779unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
780    fn create_entry(mut self: Pin<&mut Self>) -> OpEntry {
781        self.as_mut().set_msg();
782        let this = self.project();
783        opcode::SendMsg::new(Fd(this.fd.as_fd().as_raw_fd()), this.msg)
784            .flags(*this.flags as _)
785            .build()
786            .into()
787    }
788}
789
790unsafe impl<S: AsFd> OpCode for PollOnce<S> {
791    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
792        let flags = match self.interest {
793            Interest::Readable => libc::POLLIN,
794            Interest::Writable => libc::POLLOUT,
795        };
796        opcode::PollAdd::new(Fd(self.fd.as_fd().as_raw_fd()), flags as _)
797            .build()
798            .into()
799    }
800}
801
802unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
803    fn create_entry(self: Pin<&mut Self>) -> OpEntry {
804        opcode::Splice::new(
805            Fd(self.fd_in.as_fd().as_raw_fd()),
806            self.offset_in,
807            Fd(self.fd_out.as_fd().as_raw_fd()),
808            self.offset_out,
809            self.len.try_into().unwrap_or(u32::MAX),
810        )
811        .flags(self.flags)
812        .build()
813        .into()
814    }
815
816    fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
817        let mut offset_in = self.offset_in;
818        let mut offset_out = self.offset_out;
819        let offset_in_ptr = if offset_in < 0 {
820            std::ptr::null_mut()
821        } else {
822            &mut offset_in
823        };
824        let offset_out_ptr = if offset_out < 0 {
825            std::ptr::null_mut()
826        } else {
827            &mut offset_out
828        };
829        Ok(syscall!(libc::splice(
830            self.fd_in.as_fd().as_raw_fd(),
831            offset_in_ptr,
832            self.fd_out.as_fd().as_raw_fd(),
833            offset_out_ptr,
834            self.len,
835            self.flags,
836        ))? as _)
837    }
838}
839
840mod buf_ring {
841    use std::{
842        io,
843        marker::PhantomPinned,
844        os::fd::{AsFd, AsRawFd},
845        pin::Pin,
846        ptr,
847    };
848
849    use io_uring::{opcode, squeue::Flags, types::Fd};
850
851    use super::OpCode;
852    use crate::{BorrowedBuffer, BufferPool, OpEntry, TakeBuffer};
853
854    /// Read a file at specified position into specified buffer.
855    #[derive(Debug)]
856    pub struct ReadManagedAt<S> {
857        pub(crate) fd: S,
858        pub(crate) offset: u64,
859        buffer_group: u16,
860        len: u32,
861        _p: PhantomPinned,
862    }
863
864    impl<S> ReadManagedAt<S> {
865        /// Create [`ReadManagedAt`].
866        pub fn new(fd: S, offset: u64, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
867            #[cfg(fusion)]
868            let buffer_pool = buffer_pool.as_io_uring();
869            Ok(Self {
870                fd,
871                offset,
872                buffer_group: buffer_pool.buffer_group(),
873                len: len.try_into().map_err(|_| {
874                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
875                })?,
876                _p: PhantomPinned,
877            })
878        }
879    }
880
881    unsafe impl<S: AsFd> OpCode for ReadManagedAt<S> {
882        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
883            let fd = Fd(self.fd.as_fd().as_raw_fd());
884            let offset = self.offset;
885            opcode::Read::new(fd, ptr::null_mut(), self.len)
886                .offset(offset)
887                .buf_group(self.buffer_group)
888                .build()
889                .flags(Flags::BUFFER_SELECT)
890                .into()
891        }
892    }
893
894    impl<S> TakeBuffer for ReadManagedAt<S> {
895        type Buffer<'a> = BorrowedBuffer<'a>;
896        type BufferPool = BufferPool;
897
898        fn take_buffer(
899            self,
900            buffer_pool: &Self::BufferPool,
901            result: io::Result<usize>,
902            buffer_id: u16,
903        ) -> io::Result<Self::Buffer<'_>> {
904            #[cfg(fusion)]
905            let buffer_pool = buffer_pool.as_io_uring();
906            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
907            // SAFETY: result is valid
908            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
909            #[cfg(fusion)]
910            let res = res.map(BorrowedBuffer::new_io_uring);
911            res
912        }
913    }
914
915    /// Receive data from remote.
916    pub struct ReadManaged<S> {
917        fd: S,
918        buffer_group: u16,
919        len: u32,
920        _p: PhantomPinned,
921    }
922
923    impl<S> ReadManaged<S> {
924        /// Create [`ReadManaged`].
925        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize) -> io::Result<Self> {
926            #[cfg(fusion)]
927            let buffer_pool = buffer_pool.as_io_uring();
928            Ok(Self {
929                fd,
930                buffer_group: buffer_pool.buffer_group(),
931                len: len.try_into().map_err(|_| {
932                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
933                })?,
934                _p: PhantomPinned,
935            })
936        }
937    }
938
939    unsafe impl<S: AsFd> OpCode for ReadManaged<S> {
940        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
941            let fd = self.fd.as_fd().as_raw_fd();
942            opcode::Read::new(Fd(fd), ptr::null_mut(), self.len)
943                .buf_group(self.buffer_group)
944                .build()
945                .flags(Flags::BUFFER_SELECT)
946                .into()
947        }
948    }
949
950    impl<S> TakeBuffer for ReadManaged<S> {
951        type Buffer<'a> = BorrowedBuffer<'a>;
952        type BufferPool = BufferPool;
953
954        fn take_buffer(
955            self,
956            buffer_pool: &Self::BufferPool,
957            result: io::Result<usize>,
958            buffer_id: u16,
959        ) -> io::Result<Self::Buffer<'_>> {
960            #[cfg(fusion)]
961            let buffer_pool = buffer_pool.as_io_uring();
962            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
963            // SAFETY: result is valid
964            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
965            #[cfg(fusion)]
966            let res = res.map(BorrowedBuffer::new_io_uring);
967            res
968        }
969    }
970
971    /// Receive data from remote.
972    pub struct RecvManaged<S> {
973        fd: S,
974        buffer_group: u16,
975        len: u32,
976        flags: i32,
977        _p: PhantomPinned,
978    }
979
980    impl<S> RecvManaged<S> {
981        /// Create [`RecvManaged`].
982        pub fn new(fd: S, buffer_pool: &BufferPool, len: usize, flags: i32) -> io::Result<Self> {
983            #[cfg(fusion)]
984            let buffer_pool = buffer_pool.as_io_uring();
985            Ok(Self {
986                fd,
987                buffer_group: buffer_pool.buffer_group(),
988                len: len.try_into().map_err(|_| {
989                    io::Error::new(io::ErrorKind::InvalidInput, "required length too long")
990                })?,
991                flags,
992                _p: PhantomPinned,
993            })
994        }
995    }
996
997    unsafe impl<S: AsFd> OpCode for RecvManaged<S> {
998        fn create_entry(self: Pin<&mut Self>) -> OpEntry {
999            let fd = self.fd.as_fd().as_raw_fd();
1000            opcode::Recv::new(Fd(fd), ptr::null_mut(), self.len)
1001                .flags(self.flags)
1002                .buf_group(self.buffer_group)
1003                .build()
1004                .flags(Flags::BUFFER_SELECT)
1005                .into()
1006        }
1007    }
1008
1009    impl<S> TakeBuffer for RecvManaged<S> {
1010        type Buffer<'a> = BorrowedBuffer<'a>;
1011        type BufferPool = BufferPool;
1012
1013        fn take_buffer(
1014            self,
1015            buffer_pool: &Self::BufferPool,
1016            result: io::Result<usize>,
1017            buffer_id: u16,
1018        ) -> io::Result<Self::Buffer<'_>> {
1019            #[cfg(fusion)]
1020            let buffer_pool = buffer_pool.as_io_uring();
1021            let result = result.inspect_err(|_| buffer_pool.reuse_buffer(buffer_id))?;
1022            // SAFETY: result is valid
1023            let res = unsafe { buffer_pool.get_buffer(buffer_id, result) };
1024            #[cfg(fusion)]
1025            let res = res.map(BorrowedBuffer::new_io_uring);
1026            res
1027        }
1028    }
1029}
1030
1031pub use buf_ring::{ReadManaged, ReadManagedAt, RecvManaged};