Skip to main content

compio_driver/sys/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
13#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
14use libc::{pread, preadv, pwrite, pwritev};
15#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
16use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
17use pin_project_lite::pin_project;
18use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
19
20use super::{AsFd, Decision, OpCode, OpType, syscall};
21pub use crate::sys::unix_op::*;
22use crate::{op::*, sys_slice::*};
23
24unsafe impl<
25    D: std::marker::Send + 'static,
26    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
27> OpCode for Asyncify<F, D>
28{
29    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
30        Ok(Decision::Blocking)
31    }
32
33    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
34        let this = self.project();
35        let f = this
36            .f
37            .take()
38            .expect("the operate method could only be called once");
39        let BufResult(res, data) = f();
40        *this.data = Some(data);
41        Poll::Ready(res)
42    }
43}
44
45unsafe impl<
46    S,
47    D: std::marker::Send + 'static,
48    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
49> OpCode for AsyncifyFd<S, F, D>
50{
51    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
52        Ok(Decision::Blocking)
53    }
54
55    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
56        let this = self.project();
57        let f = this
58            .f
59            .take()
60            .expect("the operate method could only be called once");
61        let BufResult(res, data) = f(this.fd);
62        *this.data = Some(data);
63        Poll::Ready(res)
64    }
65}
66
67unsafe impl OpCode for OpenFile {
68    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
69        Ok(Decision::Blocking)
70    }
71
72    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
73        Poll::Ready(self.call())
74    }
75}
76
77unsafe impl OpCode for CloseFile {
78    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
79        Ok(Decision::Blocking)
80    }
81
82    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
83        Poll::Ready(self.call())
84    }
85}
86
87unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
88    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
89        Ok(Decision::Blocking)
90    }
91
92    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
93        Poll::Ready(self.call())
94    }
95}
96
97pin_project! {
98    /// Get metadata of an opened file.
99    pub struct FileStat<S> {
100        pub(crate) fd: S,
101        pub(crate) stat: Stat,
102    }
103}
104
105impl<S> FileStat<S> {
106    /// Create [`FileStat`].
107    pub fn new(fd: S) -> Self {
108        Self {
109            fd,
110            stat: unsafe { std::mem::zeroed() },
111        }
112    }
113}
114
115unsafe impl<S: AsFd> OpCode for FileStat<S> {
116    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
117        Ok(Decision::Blocking)
118    }
119
120    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
121        let this = self.project();
122        #[cfg(gnulinux)]
123        {
124            let mut s: libc::statx = unsafe { std::mem::zeroed() };
125            static EMPTY_NAME: &[u8] = b"\0";
126            syscall!(libc::statx(
127                this.fd.as_fd().as_raw_fd(),
128                EMPTY_NAME.as_ptr().cast(),
129                libc::AT_EMPTY_PATH,
130                statx_mask(),
131                &mut s
132            ))?;
133            *this.stat = statx_to_stat(s);
134            Poll::Ready(Ok(0))
135        }
136        #[cfg(not(gnulinux))]
137        {
138            Poll::Ready(Ok(
139                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), this.stat))? as _,
140            ))
141        }
142    }
143}
144
145impl<S> IntoInner for FileStat<S> {
146    type Inner = Stat;
147
148    fn into_inner(self) -> Self::Inner {
149        self.stat
150    }
151}
152
153/// Get metadata from path.
154pub struct PathStat {
155    pub(crate) path: CString,
156    pub(crate) stat: Stat,
157    pub(crate) follow_symlink: bool,
158}
159
160impl PathStat {
161    /// Create [`PathStat`].
162    pub fn new(path: CString, follow_symlink: bool) -> Self {
163        Self {
164            path,
165            stat: unsafe { std::mem::zeroed() },
166            follow_symlink,
167        }
168    }
169}
170
171unsafe impl OpCode for PathStat {
172    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
173        Ok(Decision::Blocking)
174    }
175
176    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
177        #[cfg(gnulinux)]
178        {
179            let mut flags = libc::AT_EMPTY_PATH;
180            if !self.follow_symlink {
181                flags |= libc::AT_SYMLINK_NOFOLLOW;
182            }
183            let mut s: libc::statx = unsafe { std::mem::zeroed() };
184            syscall!(libc::statx(
185                libc::AT_FDCWD,
186                self.path.as_ptr(),
187                flags,
188                statx_mask(),
189                &mut s
190            ))?;
191            self.stat = statx_to_stat(s);
192            Poll::Ready(Ok(0))
193        }
194        #[cfg(not(gnulinux))]
195        {
196            let f = if self.follow_symlink {
197                libc::stat
198            } else {
199                libc::lstat
200            };
201            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
202        }
203    }
204}
205
206impl IntoInner for PathStat {
207    type Inner = Stat;
208
209    fn into_inner(self) -> Self::Inner {
210        self.stat
211    }
212}
213
214unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
215    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
216        #[cfg(aio)]
217        {
218            let this = self.project();
219            let slice = this.buffer.sys_slice_mut();
220
221            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
222            this.aiocb.aio_offset = *this.offset as _;
223            this.aiocb.aio_buf = slice.ptr().cast();
224            this.aiocb.aio_nbytes = slice.len();
225
226            Ok(Decision::aio(this.aiocb, libc::aio_read))
227        }
228        #[cfg(not(aio))]
229        {
230            Ok(Decision::Blocking)
231        }
232    }
233
234    #[cfg(aio)]
235    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
236        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
237    }
238
239    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
240        let fd = self.fd.as_fd().as_raw_fd();
241        let offset = self.offset;
242        let slice = self.project().buffer.sys_slice_mut();
243        syscall!(break pread(fd, slice.ptr() as _, slice.len() as _, offset as _,))
244    }
245}
246
247unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
248    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
249        #[cfg(freebsd)]
250        {
251            let this = self.project();
252            *this.slices = this.buffer.sys_slices_mut();
253
254            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
255            this.aiocb.aio_offset = *this.offset as _;
256            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
257            this.aiocb.aio_nbytes = this.slices.len();
258
259            Ok(Decision::aio(this.aiocb, libc::aio_readv))
260        }
261        #[cfg(not(freebsd))]
262        {
263            Ok(Decision::Blocking)
264        }
265    }
266
267    #[cfg(freebsd)]
268    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
269        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
270    }
271
272    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
273        let this = self.project();
274        *this.slices = this.buffer.sys_slices_mut();
275        syscall!(
276            break preadv(
277                this.fd.as_fd().as_raw_fd(),
278                this.slices.as_ptr() as _,
279                this.slices.len() as _,
280                *this.offset as _,
281            )
282        )
283    }
284}
285
286unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
287    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
288        #[cfg(aio)]
289        {
290            let this = self.project();
291            let slice = this.buffer.as_ref().sys_slice();
292
293            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
294            this.aiocb.aio_offset = *this.offset as _;
295            this.aiocb.aio_buf = slice.ptr().cast();
296            this.aiocb.aio_nbytes = slice.len();
297
298            Ok(Decision::aio(this.aiocb, libc::aio_write))
299        }
300        #[cfg(not(aio))]
301        {
302            Ok(Decision::Blocking)
303        }
304    }
305
306    #[cfg(aio)]
307    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
308        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
309    }
310
311    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
312        let slice = self.buffer.as_init();
313        syscall!(
314            break pwrite(
315                self.fd.as_fd().as_raw_fd(),
316                slice.as_ptr() as _,
317                slice.len() as _,
318                self.offset as _,
319            )
320        )
321    }
322}
323
324unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
325    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
326        #[cfg(freebsd)]
327        {
328            let this = self.project();
329            *this.slices = this.buffer.as_ref().sys_slices();
330
331            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
332            this.aiocb.aio_offset = *this.offset as _;
333            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
334            this.aiocb.aio_nbytes = this.slices.len();
335
336            Ok(Decision::aio(this.aiocb, libc::aio_writev))
337        }
338        #[cfg(not(freebsd))]
339        {
340            Ok(Decision::Blocking)
341        }
342    }
343
344    #[cfg(freebsd)]
345    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
346        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
347    }
348
349    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
350        let this = self.project();
351        *this.slices = this.buffer.as_ref().sys_slices();
352        syscall!(
353            break pwritev(
354                this.fd.as_fd().as_raw_fd(),
355                this.slices.as_ptr() as _,
356                this.slices.len() as _,
357                *this.offset as _,
358            )
359        )
360    }
361}
362
363unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
364    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
365        self.project().op.pre_submit()
366    }
367
368    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
369        self.project().op.op_type()
370    }
371
372    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
373        self.project().op.operate()
374    }
375}
376
377unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
378    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
379        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
380    }
381
382    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
383        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
384    }
385
386    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
387        let fd = self.fd.as_fd().as_raw_fd();
388        let slice = self.project().buffer.sys_slice_mut();
389        syscall!(break libc::read(fd, slice.ptr() as _, slice.len()))
390    }
391}
392
393unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
394    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
395        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
396    }
397
398    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
399        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
400    }
401
402    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
403        let this = self.project();
404        *this.slices = this.buffer.sys_slices_mut();
405        syscall!(
406            break libc::readv(
407                this.fd.as_fd().as_raw_fd(),
408                this.slices.as_ptr() as _,
409                this.slices.len() as _
410            )
411        )
412    }
413}
414
415unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
416    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
417        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
418    }
419
420    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
421        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
422    }
423
424    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
425        let slice = self.buffer.as_init();
426        syscall!(
427            break libc::write(
428                self.fd.as_fd().as_raw_fd(),
429                slice.as_ptr() as _,
430                slice.len()
431            )
432        )
433    }
434}
435
436unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
437    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
438        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
439    }
440
441    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
442        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
443    }
444
445    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
446        let this = self.project();
447        *this.slices = this.buffer.as_ref().sys_slices();
448        syscall!(
449            break libc::writev(
450                this.fd.as_fd().as_raw_fd(),
451                this.slices.as_ptr() as _,
452                this.slices.len() as _
453            )
454        )
455    }
456}
457
458unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManaged<S> {
459    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
460        self.project().op.pre_submit()
461    }
462
463    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
464        self.project().op.op_type()
465    }
466
467    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
468        self.project().op.operate()
469    }
470}
471
472unsafe impl<S: AsFd> OpCode for Sync<S> {
473    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
474        #[cfg(aio)]
475        {
476            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
477                unsafe { libc::aio_fsync(libc::O_SYNC, aiocbp) }
478            }
479            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
480                unsafe { libc::aio_fsync(libc::O_DSYNC, aiocbp) }
481            }
482
483            let this = self.project();
484            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
485
486            let f = if *this.datasync {
487                aio_fdatasync
488            } else {
489                aio_fsync
490            };
491
492            Ok(Decision::aio(this.aiocb, f))
493        }
494        #[cfg(not(aio))]
495        {
496            Ok(Decision::Blocking)
497        }
498    }
499
500    #[cfg(aio)]
501    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
502        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
503    }
504
505    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
506        #[cfg(datasync)]
507        {
508            Poll::Ready(Ok(syscall!(if self.datasync {
509                libc::fdatasync(self.fd.as_fd().as_raw_fd())
510            } else {
511                libc::fsync(self.fd.as_fd().as_raw_fd())
512            })? as _))
513        }
514        #[cfg(not(datasync))]
515        {
516            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
517        }
518    }
519}
520
521unsafe impl OpCode for Unlink {
522    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
523        Ok(Decision::Blocking)
524    }
525
526    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
527        Poll::Ready(self.call())
528    }
529}
530
531unsafe impl OpCode for CreateDir {
532    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
533        Ok(Decision::Blocking)
534    }
535
536    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
537        Poll::Ready(self.call())
538    }
539}
540
541unsafe impl OpCode for Rename {
542    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
543        Ok(Decision::Blocking)
544    }
545
546    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
547        Poll::Ready(self.call())
548    }
549}
550
551unsafe impl OpCode for Symlink {
552    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
553        Ok(Decision::Blocking)
554    }
555
556    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
557        Poll::Ready(self.call())
558    }
559}
560
561unsafe impl OpCode for HardLink {
562    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
563        Ok(Decision::Blocking)
564    }
565
566    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
567        Poll::Ready(self.call())
568    }
569}
570
571impl CreateSocket {
572    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
573        #[allow(unused_mut)]
574        let mut ty: i32 = self.socket_type;
575        #[cfg(any(
576            target_os = "android",
577            target_os = "dragonfly",
578            target_os = "freebsd",
579            target_os = "fuchsia",
580            target_os = "hurd",
581            target_os = "illumos",
582            target_os = "linux",
583            target_os = "netbsd",
584            target_os = "openbsd",
585            target_os = "cygwin",
586        ))]
587        {
588            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
589        }
590        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
591        let socket = unsafe { Socket2::from_raw_fd(fd) };
592        #[cfg(not(any(
593            target_os = "android",
594            target_os = "dragonfly",
595            target_os = "freebsd",
596            target_os = "fuchsia",
597            target_os = "hurd",
598            target_os = "illumos",
599            target_os = "linux",
600            target_os = "netbsd",
601            target_os = "openbsd",
602            target_os = "espidf",
603            target_os = "vita",
604            target_os = "cygwin",
605        )))]
606        socket.set_cloexec(true)?;
607        #[cfg(any(
608            target_os = "ios",
609            target_os = "macos",
610            target_os = "tvos",
611            target_os = "watchos",
612        ))]
613        socket.set_nosigpipe(true)?;
614        #[cfg(not(any(
615            target_os = "android",
616            target_os = "dragonfly",
617            target_os = "freebsd",
618            target_os = "fuchsia",
619            target_os = "hurd",
620            target_os = "illumos",
621            target_os = "linux",
622            target_os = "netbsd",
623            target_os = "openbsd",
624            target_os = "cygwin",
625        )))]
626        socket.set_nonblocking(true)?;
627        Ok(socket.into_raw_fd())
628    }
629}
630
631unsafe impl OpCode for CreateSocket {
632    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
633        Ok(Decision::Blocking)
634    }
635
636    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
637        Poll::Ready(Ok(unsafe { self.call()? } as _))
638    }
639}
640
641unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
642    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
643        Ok(Decision::Blocking)
644    }
645
646    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
647        Poll::Ready(self.call())
648    }
649}
650
651unsafe impl OpCode for CloseSocket {
652    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
653        Ok(Decision::Blocking)
654    }
655
656    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
657        Poll::Ready(self.call())
658    }
659}
660
661impl<S: AsFd> Accept<S> {
662    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
663        let this = self.project();
664        #[cfg(any(
665            target_os = "android",
666            target_os = "dragonfly",
667            target_os = "freebsd",
668            target_os = "fuchsia",
669            target_os = "illumos",
670            target_os = "linux",
671            target_os = "netbsd",
672            target_os = "openbsd",
673            target_os = "cygwin",
674        ))]
675        unsafe {
676            libc::accept4(
677                this.fd.as_fd().as_raw_fd(),
678                this.buffer as *mut _ as *mut _,
679                this.addr_len,
680                libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
681            )
682        }
683        #[cfg(not(any(
684            target_os = "android",
685            target_os = "dragonfly",
686            target_os = "freebsd",
687            target_os = "fuchsia",
688            target_os = "illumos",
689            target_os = "linux",
690            target_os = "netbsd",
691            target_os = "openbsd",
692            target_os = "cygwin",
693        )))]
694        {
695            || -> io::Result<libc::c_int> {
696                let fd = syscall!(libc::accept(
697                    this.fd.as_fd().as_raw_fd(),
698                    this.buffer as *mut _ as *mut _,
699                    this.addr_len,
700                ))?;
701                let socket = unsafe { Socket2::from_raw_fd(fd) };
702                socket.set_cloexec(true)?;
703                socket.set_nonblocking(true)?;
704                Ok(socket.into_raw_fd())
705            }()
706            .unwrap_or(-1)
707        }
708    }
709}
710
711unsafe impl<S: AsFd> OpCode for Accept<S> {
712    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
713        let fd = self.fd.as_fd().as_raw_fd();
714        syscall!(self.as_mut().call(), wait_readable(fd))
715    }
716
717    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
718        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
719    }
720
721    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
722        let res = syscall!(break self.as_mut().call());
723        if let Poll::Ready(Ok(fd)) = res {
724            // Safety: we own the fd returned by accept/accept4
725            let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
726            *self.project().accepted_fd = Some(fd);
727        }
728        res
729    }
730}
731
732unsafe impl<S: AsFd> OpCode for Connect<S> {
733    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
734        syscall!(
735            libc::connect(
736                self.fd.as_fd().as_raw_fd(),
737                self.addr.as_ptr().cast(),
738                self.addr.len()
739            ),
740            wait_writable(self.fd.as_fd().as_raw_fd())
741        )
742    }
743
744    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
745        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
746    }
747
748    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
749        let mut err: libc::c_int = 0;
750        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
751
752        syscall!(libc::getsockopt(
753            self.fd.as_fd().as_raw_fd(),
754            libc::SOL_SOCKET,
755            libc::SO_ERROR,
756            &mut err as *mut _ as *mut _,
757            &mut err_len
758        ))?;
759
760        let res = if err == 0 {
761            Ok(0)
762        } else {
763            Err(io::Error::from_raw_os_error(err))
764        };
765        Poll::Ready(res)
766    }
767}
768
769unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
770    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
771        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
772    }
773
774    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
775        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
776    }
777
778    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
779        let fd = self.fd.as_fd().as_raw_fd();
780        let flags = self.flags;
781        let slice = self.project().buffer.sys_slice_mut();
782        syscall!(break libc::recv(fd, slice.ptr() as _, slice.len(), flags))
783    }
784}
785
786unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
787    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
788        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
789    }
790
791    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
792        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
793    }
794
795    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
796        let slice = self.buffer.as_init();
797        syscall!(
798            break libc::send(
799                self.fd.as_fd().as_raw_fd(),
800                slice.as_ptr() as _,
801                slice.len(),
802                self.flags,
803            )
804        )
805    }
806}
807
808unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
809    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
810        self.project().op.pre_submit()
811    }
812
813    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
814        self.project().op.op_type()
815    }
816
817    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
818        self.project().op.operate()
819    }
820}
821
822impl<T: IoVectoredBufMut, S: AsFd> RecvVectored<T, S> {
823    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
824        let this = self.project();
825        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
826    }
827}
828
829unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
830    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
831        self.as_mut().set_msg();
832        let fd = self.fd.as_fd().as_raw_fd();
833        syscall!(self.as_mut().call(), wait_readable(fd))
834    }
835
836    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
837        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
838    }
839
840    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
841        syscall!(break self.as_mut().call())
842    }
843}
844
845impl<T: IoVectoredBuf, S: AsFd> SendVectored<T, S> {
846    unsafe fn call(&self) -> libc::ssize_t {
847        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
848    }
849}
850
851unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
852    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
853        self.as_mut().set_msg();
854        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
855        syscall!(self.as_mut().call(), wait_writable(fd))
856    }
857
858    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
859        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
860    }
861
862    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
863        syscall!(break self.call())
864    }
865}
866
867pin_project! {
868    /// Receive data and source address.
869    pub struct RecvFrom<T: IoBufMut, S> {
870        pub(crate) fd: S,
871        #[pin]
872        pub(crate) buffer: T,
873        pub(crate) addr: SockAddrStorage,
874        pub(crate) addr_len: socklen_t,
875        pub(crate) flags: i32,
876        _p: PhantomPinned,
877    }
878}
879
880impl<T: IoBufMut, S> RecvFrom<T, S> {
881    /// Create [`RecvFrom`].
882    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
883        let addr = SockAddrStorage::zeroed();
884        let addr_len = addr.size_of();
885        Self {
886            fd,
887            buffer,
888            addr,
889            addr_len,
890            flags,
891            _p: PhantomPinned,
892        }
893    }
894}
895
896impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
897    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
898        let fd = self.fd.as_fd().as_raw_fd();
899        let this = self.project();
900        let slice = this.buffer.sys_slice_mut();
901        unsafe {
902            libc::recvfrom(
903                fd,
904                slice.ptr() as _,
905                slice.len(),
906                *this.flags,
907                this.addr as *mut _ as _,
908                this.addr_len,
909            )
910        }
911    }
912}
913
914unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
915    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
916        let fd = self.fd.as_fd().as_raw_fd();
917        syscall!(self.as_mut().call(), wait_readable(fd))
918    }
919
920    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
921        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
922    }
923
924    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
925        syscall!(break self.as_mut().call())
926    }
927}
928
929impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
930    type Inner = (T, SockAddrStorage, socklen_t);
931
932    fn into_inner(self) -> Self::Inner {
933        (self.buffer, self.addr, self.addr_len)
934    }
935}
936
937pin_project! {
938    /// Receive data and source address into vectored buffer.
939    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
940        pub(crate) fd: S,
941        #[pin]
942        pub(crate) buffer: T,
943        pub(crate) slices: Vec<SysSlice>,
944        pub(crate) addr: SockAddrStorage,
945        pub(crate) msg: libc::msghdr,
946        pub(crate) flags: i32,
947        _p: PhantomPinned,
948    }
949}
950
951impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
952    /// Create [`RecvFromVectored`].
953    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
954        Self {
955            fd,
956            buffer,
957            slices: vec![],
958            addr: SockAddrStorage::zeroed(),
959            msg: unsafe { std::mem::zeroed() },
960            flags,
961            _p: PhantomPinned,
962        }
963    }
964}
965
966impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
967    fn set_msg(self: Pin<&mut Self>) {
968        let this = self.project();
969        *this.slices = this.buffer.sys_slices_mut();
970        this.msg.msg_name = this.addr as *mut _ as _;
971        this.msg.msg_namelen = this.addr.size_of() as _;
972        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
973        this.msg.msg_iovlen = this.slices.len() as _;
974    }
975
976    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
977        let this = self.project();
978        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
979    }
980}
981
982unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
983    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
984        self.as_mut().set_msg();
985        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
986        syscall!(self.as_mut().call(), wait_readable(fd))
987    }
988
989    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
990        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
991    }
992
993    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
994        syscall!(break self.as_mut().call())
995    }
996}
997
998impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
999    type Inner = (T, SockAddrStorage, socklen_t);
1000
1001    fn into_inner(self) -> Self::Inner {
1002        (self.buffer, self.addr, self.msg.msg_namelen)
1003    }
1004}
1005
1006pin_project! {
1007    /// Send data to specified address.
1008    pub struct SendTo<T: IoBuf, S> {
1009        pub(crate) fd: S,
1010        pub(crate) buffer: T,
1011        pub(crate) addr: SockAddr,
1012        flags: i32,
1013        _p: PhantomPinned,
1014    }
1015}
1016
1017impl<T: IoBuf, S> SendTo<T, S> {
1018    /// Create [`SendTo`].
1019    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1020        Self {
1021            fd,
1022            buffer,
1023            addr,
1024            flags,
1025            _p: PhantomPinned,
1026        }
1027    }
1028}
1029
1030impl<T: IoBuf, S: AsFd> SendTo<T, S> {
1031    unsafe fn call(&self) -> libc::ssize_t {
1032        let slice = self.buffer.as_init();
1033        unsafe {
1034            libc::sendto(
1035                self.fd.as_fd().as_raw_fd(),
1036                slice.as_ptr() as _,
1037                slice.len(),
1038                self.flags,
1039                self.addr.as_ptr().cast(),
1040                self.addr.len(),
1041            )
1042        }
1043    }
1044}
1045
1046unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
1047    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1048        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
1049    }
1050
1051    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1052        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1053    }
1054
1055    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1056        syscall!(break self.call())
1057    }
1058}
1059
1060impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
1061    type Inner = T;
1062
1063    fn into_inner(self) -> Self::Inner {
1064        self.buffer
1065    }
1066}
1067
1068pin_project! {
1069    /// Send data to specified address from vectored buffer.
1070    pub struct SendToVectored<T: IoVectoredBuf, S> {
1071        pub(crate) fd: S,
1072        #[pin]
1073        pub(crate) buffer: T,
1074        pub(crate) addr: SockAddr,
1075        pub(crate) slices: Vec<SysSlice>,
1076        pub(crate) msg: libc::msghdr,
1077        pub(crate) flags: i32,
1078        _p: PhantomPinned,
1079    }
1080}
1081
1082impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1083    /// Create [`SendToVectored`].
1084    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1085        Self {
1086            fd,
1087            buffer,
1088            addr,
1089            slices: vec![],
1090            msg: unsafe { std::mem::zeroed() },
1091            flags,
1092            _p: PhantomPinned,
1093        }
1094    }
1095}
1096
1097impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1098    fn set_msg(self: Pin<&mut Self>) {
1099        let this = self.project();
1100        *this.slices = this.buffer.as_ref().sys_slices();
1101        this.msg.msg_name = this.addr as *mut _ as _;
1102        this.msg.msg_namelen = this.addr.len() as _;
1103        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1104        this.msg.msg_iovlen = this.slices.len() as _;
1105    }
1106
1107    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1108        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1109    }
1110}
1111
1112unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1113    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1114        self.as_mut().set_msg();
1115        let fd = self.fd.as_fd().as_raw_fd();
1116        syscall!(self.as_mut().call(), wait_writable(fd))
1117    }
1118
1119    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1120        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1121    }
1122
1123    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1124        syscall!(break self.as_mut().call())
1125    }
1126}
1127
1128impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1129    type Inner = T;
1130
1131    fn into_inner(self) -> Self::Inner {
1132        self.buffer
1133    }
1134}
1135
1136impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1137    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1138        let this = self.project();
1139        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1140    }
1141}
1142
1143unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1144    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1145        self.as_mut().set_msg();
1146        let fd = self.fd.as_fd().as_raw_fd();
1147        syscall!(self.as_mut().call(), wait_readable(fd))
1148    }
1149
1150    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1151        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1152    }
1153
1154    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1155        syscall!(break self.as_mut().call())
1156    }
1157}
1158
1159impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1160    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1161        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1162    }
1163}
1164
1165unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1166    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1167        self.as_mut().set_msg();
1168        let fd = self.fd.as_fd().as_raw_fd();
1169        syscall!(self.as_mut().call(), wait_writable(fd))
1170    }
1171
1172    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1173        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1174    }
1175
1176    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1177        syscall!(break self.as_mut().call())
1178    }
1179}
1180
1181unsafe impl<S: AsFd> OpCode for PollOnce<S> {
1182    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1183        Ok(Decision::wait_for(
1184            self.fd.as_fd().as_raw_fd(),
1185            self.interest,
1186        ))
1187    }
1188
1189    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1190        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1191    }
1192
1193    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1194        Poll::Ready(Ok(0))
1195    }
1196}
1197
1198#[cfg(linux_all)]
1199unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1200    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1201        use super::WaitArg;
1202
1203        Ok(Decision::wait_for_many([
1204            WaitArg::readable(self.fd_in.as_fd().as_raw_fd()),
1205            WaitArg::writable(self.fd_out.as_fd().as_raw_fd()),
1206        ]))
1207    }
1208
1209    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1210        Some(OpType::multi_fd([
1211            self.fd_in.as_fd().as_raw_fd(),
1212            self.fd_out.as_fd().as_raw_fd(),
1213        ]))
1214    }
1215
1216    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1217        let mut offset_in = self.offset_in;
1218        let mut offset_out = self.offset_out;
1219        let offset_in_ptr = if offset_in < 0 {
1220            std::ptr::null_mut()
1221        } else {
1222            &mut offset_in
1223        };
1224        let offset_out_ptr = if offset_out < 0 {
1225            std::ptr::null_mut()
1226        } else {
1227            &mut offset_out
1228        };
1229        syscall!(
1230            break libc::splice(
1231                self.fd_in.as_fd().as_raw_fd(),
1232                offset_in_ptr,
1233                self.fd_out.as_fd().as_raw_fd(),
1234                offset_out_ptr,
1235                self.len,
1236                self.flags | libc::SPLICE_F_NONBLOCK,
1237            )
1238        )
1239    }
1240}