Skip to main content

compio_driver/sys/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
13#[cfg(not(gnulinux))]
14use libc::open;
15#[cfg(gnulinux)]
16use libc::open64 as open;
17#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
18use libc::{pread, preadv, pwrite, pwritev};
19#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
20use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
21use pin_project_lite::pin_project;
22use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
23
24use super::{AsFd, Decision, OpCode, OpType, syscall};
25pub use crate::sys::unix_op::*;
26use crate::{op::*, sys_slice::*};
27
28impl<
29    D: std::marker::Send + 'static,
30    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
31> OpCode for Asyncify<F, D>
32{
33    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
34        Ok(Decision::Blocking)
35    }
36
37    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
38        let this = self.project();
39        let f = this
40            .f
41            .take()
42            .expect("the operate method could only be called once");
43        let BufResult(res, data) = f();
44        *this.data = Some(data);
45        Poll::Ready(res)
46    }
47}
48
49impl<
50    S,
51    D: std::marker::Send + 'static,
52    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
53> OpCode for AsyncifyFd<S, F, D>
54{
55    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
56        Ok(Decision::Blocking)
57    }
58
59    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
60        let this = self.project();
61        let f = this
62            .f
63            .take()
64            .expect("the operate method could only be called once");
65        let BufResult(res, data) = f(this.fd);
66        *this.data = Some(data);
67        Poll::Ready(res)
68    }
69}
70
71impl OpCode for OpenFile {
72    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
73        Ok(Decision::Blocking)
74    }
75
76    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
77        Poll::Ready(Ok(syscall!(open(
78            self.path.as_ptr(),
79            self.flags | libc::O_CLOEXEC,
80            self.mode as libc::c_int
81        ))? as _))
82    }
83}
84
85impl OpCode for CloseFile {
86    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
87        Ok(Decision::Blocking)
88    }
89
90    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
91        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
92    }
93}
94
95impl<S: AsFd> OpCode for TruncateFile<S> {
96    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
97        Ok(Decision::Blocking)
98    }
99
100    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
101        Poll::Ready(self.truncate())
102    }
103}
104
105pin_project! {
106    /// Get metadata of an opened file.
107    pub struct FileStat<S> {
108        pub(crate) fd: S,
109        pub(crate) stat: Stat,
110    }
111}
112
113impl<S> FileStat<S> {
114    /// Create [`FileStat`].
115    pub fn new(fd: S) -> Self {
116        Self {
117            fd,
118            stat: unsafe { std::mem::zeroed() },
119        }
120    }
121}
122
123impl<S: AsFd> OpCode for FileStat<S> {
124    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
125        Ok(Decision::Blocking)
126    }
127
128    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
129        let this = self.project();
130        #[cfg(gnulinux)]
131        {
132            let mut s: libc::statx = unsafe { std::mem::zeroed() };
133            static EMPTY_NAME: &[u8] = b"\0";
134            syscall!(libc::statx(
135                this.fd.as_fd().as_raw_fd(),
136                EMPTY_NAME.as_ptr().cast(),
137                libc::AT_EMPTY_PATH,
138                statx_mask(),
139                &mut s
140            ))?;
141            *this.stat = statx_to_stat(s);
142            Poll::Ready(Ok(0))
143        }
144        #[cfg(not(gnulinux))]
145        {
146            Poll::Ready(Ok(
147                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), this.stat))? as _,
148            ))
149        }
150    }
151}
152
153impl<S> IntoInner for FileStat<S> {
154    type Inner = Stat;
155
156    fn into_inner(self) -> Self::Inner {
157        self.stat
158    }
159}
160
161/// Get metadata from path.
162pub struct PathStat {
163    pub(crate) path: CString,
164    pub(crate) stat: Stat,
165    pub(crate) follow_symlink: bool,
166}
167
168impl PathStat {
169    /// Create [`PathStat`].
170    pub fn new(path: CString, follow_symlink: bool) -> Self {
171        Self {
172            path,
173            stat: unsafe { std::mem::zeroed() },
174            follow_symlink,
175        }
176    }
177}
178
179impl OpCode for PathStat {
180    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
181        Ok(Decision::Blocking)
182    }
183
184    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
185        #[cfg(gnulinux)]
186        {
187            let mut flags = libc::AT_EMPTY_PATH;
188            if !self.follow_symlink {
189                flags |= libc::AT_SYMLINK_NOFOLLOW;
190            }
191            let mut s: libc::statx = unsafe { std::mem::zeroed() };
192            syscall!(libc::statx(
193                libc::AT_FDCWD,
194                self.path.as_ptr(),
195                flags,
196                statx_mask(),
197                &mut s
198            ))?;
199            self.stat = statx_to_stat(s);
200            Poll::Ready(Ok(0))
201        }
202        #[cfg(not(gnulinux))]
203        {
204            let f = if self.follow_symlink {
205                libc::stat
206            } else {
207                libc::lstat
208            };
209            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
210        }
211    }
212}
213
214impl IntoInner for PathStat {
215    type Inner = Stat;
216
217    fn into_inner(self) -> Self::Inner {
218        self.stat
219    }
220}
221
222impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
223    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
224        #[cfg(aio)]
225        {
226            let this = self.project();
227            let slice = this.buffer.sys_slice_mut();
228
229            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
230            this.aiocb.aio_offset = *this.offset as _;
231            this.aiocb.aio_buf = slice.ptr().cast();
232            this.aiocb.aio_nbytes = slice.len();
233
234            Ok(Decision::aio(this.aiocb, libc::aio_read))
235        }
236        #[cfg(not(aio))]
237        {
238            Ok(Decision::Blocking)
239        }
240    }
241
242    #[cfg(aio)]
243    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
244        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
245    }
246
247    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
248        let fd = self.fd.as_fd().as_raw_fd();
249        let offset = self.offset;
250        let slice = self.project().buffer.sys_slice_mut();
251        syscall!(break pread(fd, slice.ptr() as _, slice.len() as _, offset as _,))
252    }
253}
254
255impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
256    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
257        #[cfg(freebsd)]
258        {
259            let this = self.project();
260            *this.slices = this.buffer.sys_slices_mut();
261
262            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
263            this.aiocb.aio_offset = *this.offset as _;
264            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
265            this.aiocb.aio_nbytes = this.slices.len();
266
267            Ok(Decision::aio(this.aiocb, libc::aio_readv))
268        }
269        #[cfg(not(freebsd))]
270        {
271            Ok(Decision::Blocking)
272        }
273    }
274
275    #[cfg(freebsd)]
276    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
277        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
278    }
279
280    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
281        let this = self.project();
282        *this.slices = this.buffer.sys_slices_mut();
283        syscall!(
284            break preadv(
285                this.fd.as_fd().as_raw_fd(),
286                this.slices.as_ptr() as _,
287                this.slices.len() as _,
288                *this.offset as _,
289            )
290        )
291    }
292}
293
294impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
295    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
296        #[cfg(aio)]
297        {
298            let this = self.project();
299            let slice = this.buffer.as_ref().sys_slice();
300
301            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
302            this.aiocb.aio_offset = *this.offset as _;
303            this.aiocb.aio_buf = slice.ptr().cast();
304            this.aiocb.aio_nbytes = slice.len();
305
306            Ok(Decision::aio(this.aiocb, libc::aio_write))
307        }
308        #[cfg(not(aio))]
309        {
310            Ok(Decision::Blocking)
311        }
312    }
313
314    #[cfg(aio)]
315    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
316        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
317    }
318
319    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
320        let slice = self.buffer.as_init();
321        syscall!(
322            break pwrite(
323                self.fd.as_fd().as_raw_fd(),
324                slice.as_ptr() as _,
325                slice.len() as _,
326                self.offset as _,
327            )
328        )
329    }
330}
331
332impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
333    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
334        #[cfg(freebsd)]
335        {
336            let this = self.project();
337            *this.slices = this.buffer.as_ref().sys_slices();
338
339            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
340            this.aiocb.aio_offset = *this.offset as _;
341            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
342            this.aiocb.aio_nbytes = this.slices.len();
343
344            Ok(Decision::aio(this.aiocb, libc::aio_writev))
345        }
346        #[cfg(not(freebsd))]
347        {
348            Ok(Decision::Blocking)
349        }
350    }
351
352    #[cfg(freebsd)]
353    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
354        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
355    }
356
357    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
358        let this = self.project();
359        *this.slices = this.buffer.as_ref().sys_slices();
360        syscall!(
361            break pwritev(
362                this.fd.as_fd().as_raw_fd(),
363                this.slices.as_ptr() as _,
364                this.slices.len() as _,
365                *this.offset as _,
366            )
367        )
368    }
369}
370
371impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
372    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
373        self.project().op.pre_submit()
374    }
375
376    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
377        self.project().op.op_type()
378    }
379
380    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
381        self.project().op.operate()
382    }
383}
384
385impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
386    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
387        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
388    }
389
390    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
391        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
392    }
393
394    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
395        let fd = self.fd.as_fd().as_raw_fd();
396        let slice = self.project().buffer.sys_slice_mut();
397        syscall!(break libc::read(fd, slice.ptr() as _, slice.len()))
398    }
399}
400
401impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
402    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
403        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
404    }
405
406    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
407        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
408    }
409
410    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
411        let this = self.project();
412        *this.slices = this.buffer.sys_slices_mut();
413        syscall!(
414            break libc::readv(
415                this.fd.as_fd().as_raw_fd(),
416                this.slices.as_ptr() as _,
417                this.slices.len() as _
418            )
419        )
420    }
421}
422
423impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
424    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
425        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
426    }
427
428    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
429        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
430    }
431
432    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
433        let slice = self.buffer.as_init();
434        syscall!(
435            break libc::write(
436                self.fd.as_fd().as_raw_fd(),
437                slice.as_ptr() as _,
438                slice.len()
439            )
440        )
441    }
442}
443
444impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
445    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
446        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
447    }
448
449    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
450        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
451    }
452
453    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
454        let this = self.project();
455        *this.slices = this.buffer.as_ref().sys_slices();
456        syscall!(
457            break libc::writev(
458                this.fd.as_fd().as_raw_fd(),
459                this.slices.as_ptr() as _,
460                this.slices.len() as _
461            )
462        )
463    }
464}
465
466impl<S: AsFd> OpCode for crate::op::managed::ReadManaged<S> {
467    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
468        self.project().op.pre_submit()
469    }
470
471    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
472        self.project().op.op_type()
473    }
474
475    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
476        self.project().op.operate()
477    }
478}
479
480impl<S: AsFd> OpCode for Sync<S> {
481    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
482        #[cfg(aio)]
483        {
484            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
485                unsafe { libc::aio_fsync(libc::O_SYNC, aiocbp) }
486            }
487            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
488                unsafe { libc::aio_fsync(libc::O_DSYNC, aiocbp) }
489            }
490
491            let this = self.project();
492            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
493
494            let f = if *this.datasync {
495                aio_fdatasync
496            } else {
497                aio_fsync
498            };
499
500            Ok(Decision::aio(this.aiocb, f))
501        }
502        #[cfg(not(aio))]
503        {
504            Ok(Decision::Blocking)
505        }
506    }
507
508    #[cfg(aio)]
509    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
510        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
511    }
512
513    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
514        #[cfg(datasync)]
515        {
516            Poll::Ready(Ok(syscall!(if self.datasync {
517                libc::fdatasync(self.fd.as_fd().as_raw_fd())
518            } else {
519                libc::fsync(self.fd.as_fd().as_raw_fd())
520            })? as _))
521        }
522        #[cfg(not(datasync))]
523        {
524            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
525        }
526    }
527}
528
529impl OpCode for Unlink {
530    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
531        Ok(Decision::Blocking)
532    }
533
534    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
535        if self.dir {
536            syscall!(libc::rmdir(self.path.as_ptr()))?;
537        } else {
538            syscall!(libc::unlink(self.path.as_ptr()))?;
539        }
540        Poll::Ready(Ok(0))
541    }
542}
543
544impl OpCode for CreateDir {
545    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
546        Ok(Decision::Blocking)
547    }
548
549    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
550        syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
551        Poll::Ready(Ok(0))
552    }
553}
554
555impl OpCode for Rename {
556    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
557        Ok(Decision::Blocking)
558    }
559
560    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
561        syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
562        Poll::Ready(Ok(0))
563    }
564}
565
566impl OpCode for Symlink {
567    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
568        Ok(Decision::Blocking)
569    }
570
571    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
572        syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
573        Poll::Ready(Ok(0))
574    }
575}
576
577impl OpCode for HardLink {
578    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
579        Ok(Decision::Blocking)
580    }
581
582    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
583        syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
584        Poll::Ready(Ok(0))
585    }
586}
587
588impl CreateSocket {
589    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
590        #[allow(unused_mut)]
591        let mut ty: i32 = self.socket_type;
592        #[cfg(any(
593            target_os = "android",
594            target_os = "dragonfly",
595            target_os = "freebsd",
596            target_os = "fuchsia",
597            target_os = "hurd",
598            target_os = "illumos",
599            target_os = "linux",
600            target_os = "netbsd",
601            target_os = "openbsd",
602            target_os = "cygwin",
603        ))]
604        {
605            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
606        }
607        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
608        let socket = unsafe { Socket2::from_raw_fd(fd) };
609        #[cfg(not(any(
610            target_os = "android",
611            target_os = "dragonfly",
612            target_os = "freebsd",
613            target_os = "fuchsia",
614            target_os = "hurd",
615            target_os = "illumos",
616            target_os = "linux",
617            target_os = "netbsd",
618            target_os = "openbsd",
619            target_os = "espidf",
620            target_os = "vita",
621            target_os = "cygwin",
622        )))]
623        socket.set_cloexec(true)?;
624        #[cfg(any(
625            target_os = "ios",
626            target_os = "macos",
627            target_os = "tvos",
628            target_os = "watchos",
629        ))]
630        socket.set_nosigpipe(true)?;
631        #[cfg(not(any(
632            target_os = "android",
633            target_os = "dragonfly",
634            target_os = "freebsd",
635            target_os = "fuchsia",
636            target_os = "hurd",
637            target_os = "illumos",
638            target_os = "linux",
639            target_os = "netbsd",
640            target_os = "openbsd",
641            target_os = "cygwin",
642        )))]
643        socket.set_nonblocking(true)?;
644        Ok(socket.into_raw_fd())
645    }
646}
647
648impl OpCode for CreateSocket {
649    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
650        Ok(Decision::Blocking)
651    }
652
653    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
654        Poll::Ready(Ok(unsafe { self.call()? } as _))
655    }
656}
657
658impl<S: AsFd> OpCode for ShutdownSocket<S> {
659    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
660        Ok(Decision::Blocking)
661    }
662
663    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
664        Poll::Ready(Ok(
665            syscall!(libc::shutdown(self.fd.as_fd().as_raw_fd(), self.how()))? as _,
666        ))
667    }
668}
669
670impl OpCode for CloseSocket {
671    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
672        Ok(Decision::Blocking)
673    }
674
675    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
676        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
677    }
678}
679
680impl<S: AsFd> Accept<S> {
681    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
682        let this = self.project();
683        #[cfg(any(
684            target_os = "android",
685            target_os = "dragonfly",
686            target_os = "freebsd",
687            target_os = "fuchsia",
688            target_os = "illumos",
689            target_os = "linux",
690            target_os = "netbsd",
691            target_os = "openbsd",
692            target_os = "cygwin",
693        ))]
694        unsafe {
695            libc::accept4(
696                this.fd.as_fd().as_raw_fd(),
697                this.buffer as *mut _ as *mut _,
698                this.addr_len,
699                libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
700            )
701        }
702        #[cfg(not(any(
703            target_os = "android",
704            target_os = "dragonfly",
705            target_os = "freebsd",
706            target_os = "fuchsia",
707            target_os = "illumos",
708            target_os = "linux",
709            target_os = "netbsd",
710            target_os = "openbsd",
711            target_os = "cygwin",
712        )))]
713        {
714            || -> io::Result<libc::c_int> {
715                let fd = syscall!(libc::accept(
716                    this.fd.as_fd().as_raw_fd(),
717                    this.buffer as *mut _ as *mut _,
718                    this.addr_len,
719                ))?;
720                let socket = unsafe { Socket2::from_raw_fd(fd) };
721                socket.set_cloexec(true)?;
722                socket.set_nonblocking(true)?;
723                Ok(socket.into_raw_fd())
724            }()
725            .unwrap_or(-1)
726        }
727    }
728}
729
730impl<S: AsFd> OpCode for Accept<S> {
731    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
732        let fd = self.fd.as_fd().as_raw_fd();
733        syscall!(self.as_mut().call(), wait_readable(fd))
734    }
735
736    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
737        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
738    }
739
740    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
741        let res = syscall!(break self.as_mut().call());
742        if let Poll::Ready(Ok(fd)) = res {
743            // Safety: we own the fd returned by accept/accept4
744            let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
745            *self.project().accepted_fd = Some(fd);
746        }
747        res
748    }
749}
750
751impl<S: AsFd> OpCode for Connect<S> {
752    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
753        syscall!(
754            libc::connect(
755                self.fd.as_fd().as_raw_fd(),
756                self.addr.as_ptr().cast(),
757                self.addr.len()
758            ),
759            wait_writable(self.fd.as_fd().as_raw_fd())
760        )
761    }
762
763    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
764        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
765    }
766
767    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
768        let mut err: libc::c_int = 0;
769        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
770
771        syscall!(libc::getsockopt(
772            self.fd.as_fd().as_raw_fd(),
773            libc::SOL_SOCKET,
774            libc::SO_ERROR,
775            &mut err as *mut _ as *mut _,
776            &mut err_len
777        ))?;
778
779        let res = if err == 0 {
780            Ok(0)
781        } else {
782            Err(io::Error::from_raw_os_error(err))
783        };
784        Poll::Ready(res)
785    }
786}
787
788impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
789    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
790        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
791    }
792
793    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
794        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
795    }
796
797    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
798        let fd = self.fd.as_fd().as_raw_fd();
799        let flags = self.flags;
800        let slice = self.project().buffer.sys_slice_mut();
801        syscall!(break libc::recv(fd, slice.ptr() as _, slice.len(), flags))
802    }
803}
804
805impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
806    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
807        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
808    }
809
810    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
811        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
812    }
813
814    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
815        let slice = self.buffer.as_init();
816        syscall!(
817            break libc::send(
818                self.fd.as_fd().as_raw_fd(),
819                slice.as_ptr() as _,
820                slice.len(),
821                self.flags,
822            )
823        )
824    }
825}
826
827impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
828    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
829        self.project().op.pre_submit()
830    }
831
832    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
833        self.project().op.op_type()
834    }
835
836    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
837        self.project().op.operate()
838    }
839}
840
841impl<T: IoVectoredBufMut, S: AsFd> RecvVectored<T, S> {
842    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
843        let this = self.project();
844        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
845    }
846}
847
848impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
849    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
850        self.as_mut().set_msg();
851        let fd = self.fd.as_fd().as_raw_fd();
852        syscall!(self.as_mut().call(), wait_readable(fd))
853    }
854
855    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
856        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
857    }
858
859    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
860        syscall!(break self.as_mut().call())
861    }
862}
863
864impl<T: IoVectoredBuf, S: AsFd> SendVectored<T, S> {
865    unsafe fn call(&self) -> libc::ssize_t {
866        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
867    }
868}
869
870impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
871    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
872        self.as_mut().set_msg();
873        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
874        syscall!(self.as_mut().call(), wait_writable(fd))
875    }
876
877    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
878        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
879    }
880
881    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
882        syscall!(break self.call())
883    }
884}
885
886pin_project! {
887    /// Receive data and source address.
888    pub struct RecvFrom<T: IoBufMut, S> {
889        pub(crate) fd: S,
890        #[pin]
891        pub(crate) buffer: T,
892        pub(crate) addr: SockAddrStorage,
893        pub(crate) addr_len: socklen_t,
894        pub(crate) flags: i32,
895        _p: PhantomPinned,
896    }
897}
898
899impl<T: IoBufMut, S> RecvFrom<T, S> {
900    /// Create [`RecvFrom`].
901    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
902        let addr = SockAddrStorage::zeroed();
903        let addr_len = addr.size_of();
904        Self {
905            fd,
906            buffer,
907            addr,
908            addr_len,
909            flags,
910            _p: PhantomPinned,
911        }
912    }
913}
914
915impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
916    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
917        let fd = self.fd.as_fd().as_raw_fd();
918        let this = self.project();
919        let slice = this.buffer.sys_slice_mut();
920        unsafe {
921            libc::recvfrom(
922                fd,
923                slice.ptr() as _,
924                slice.len(),
925                *this.flags,
926                this.addr as *mut _ as _,
927                this.addr_len,
928            )
929        }
930    }
931}
932
933impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
934    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
935        let fd = self.fd.as_fd().as_raw_fd();
936        syscall!(self.as_mut().call(), wait_readable(fd))
937    }
938
939    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
940        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
941    }
942
943    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
944        syscall!(break self.as_mut().call())
945    }
946}
947
948impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
949    type Inner = (T, SockAddrStorage, socklen_t);
950
951    fn into_inner(self) -> Self::Inner {
952        (self.buffer, self.addr, self.addr_len)
953    }
954}
955
956pin_project! {
957    /// Receive data and source address into vectored buffer.
958    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
959        pub(crate) fd: S,
960        #[pin]
961        pub(crate) buffer: T,
962        pub(crate) slices: Vec<SysSlice>,
963        pub(crate) addr: SockAddrStorage,
964        pub(crate) msg: libc::msghdr,
965        pub(crate) flags: i32,
966        _p: PhantomPinned,
967    }
968}
969
970impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
971    /// Create [`RecvFromVectored`].
972    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
973        Self {
974            fd,
975            buffer,
976            slices: vec![],
977            addr: SockAddrStorage::zeroed(),
978            msg: unsafe { std::mem::zeroed() },
979            flags,
980            _p: PhantomPinned,
981        }
982    }
983}
984
985impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
986    fn set_msg(self: Pin<&mut Self>) {
987        let this = self.project();
988        *this.slices = this.buffer.sys_slices_mut();
989        this.msg.msg_name = this.addr as *mut _ as _;
990        this.msg.msg_namelen = this.addr.size_of() as _;
991        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
992        this.msg.msg_iovlen = this.slices.len() as _;
993    }
994
995    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
996        let this = self.project();
997        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
998    }
999}
1000
1001impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
1002    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1003        self.as_mut().set_msg();
1004        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
1005        syscall!(self.as_mut().call(), wait_readable(fd))
1006    }
1007
1008    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1009        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1010    }
1011
1012    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1013        syscall!(break self.as_mut().call())
1014    }
1015}
1016
1017impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
1018    type Inner = (T, SockAddrStorage, socklen_t);
1019
1020    fn into_inner(self) -> Self::Inner {
1021        (self.buffer, self.addr, self.msg.msg_namelen)
1022    }
1023}
1024
1025pin_project! {
1026    /// Send data to specified address.
1027    pub struct SendTo<T: IoBuf, S> {
1028        pub(crate) fd: S,
1029        pub(crate) buffer: T,
1030        pub(crate) addr: SockAddr,
1031        flags: i32,
1032        _p: PhantomPinned,
1033    }
1034}
1035
1036impl<T: IoBuf, S> SendTo<T, S> {
1037    /// Create [`SendTo`].
1038    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1039        Self {
1040            fd,
1041            buffer,
1042            addr,
1043            flags,
1044            _p: PhantomPinned,
1045        }
1046    }
1047}
1048
1049impl<T: IoBuf, S: AsFd> SendTo<T, S> {
1050    unsafe fn call(&self) -> libc::ssize_t {
1051        let slice = self.buffer.as_init();
1052        unsafe {
1053            libc::sendto(
1054                self.fd.as_fd().as_raw_fd(),
1055                slice.as_ptr() as _,
1056                slice.len(),
1057                self.flags,
1058                self.addr.as_ptr().cast(),
1059                self.addr.len(),
1060            )
1061        }
1062    }
1063}
1064
1065impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
1066    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1067        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
1068    }
1069
1070    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1071        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1072    }
1073
1074    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1075        syscall!(break self.call())
1076    }
1077}
1078
1079impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
1080    type Inner = T;
1081
1082    fn into_inner(self) -> Self::Inner {
1083        self.buffer
1084    }
1085}
1086
1087pin_project! {
1088    /// Send data to specified address from vectored buffer.
1089    pub struct SendToVectored<T: IoVectoredBuf, S> {
1090        pub(crate) fd: S,
1091        #[pin]
1092        pub(crate) buffer: T,
1093        pub(crate) addr: SockAddr,
1094        pub(crate) slices: Vec<SysSlice>,
1095        pub(crate) msg: libc::msghdr,
1096        pub(crate) flags: i32,
1097        _p: PhantomPinned,
1098    }
1099}
1100
1101impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1102    /// Create [`SendToVectored`].
1103    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1104        Self {
1105            fd,
1106            buffer,
1107            addr,
1108            slices: vec![],
1109            msg: unsafe { std::mem::zeroed() },
1110            flags,
1111            _p: PhantomPinned,
1112        }
1113    }
1114}
1115
1116impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1117    fn set_msg(self: Pin<&mut Self>) {
1118        let this = self.project();
1119        *this.slices = this.buffer.as_ref().sys_slices();
1120        this.msg.msg_name = this.addr as *mut _ as _;
1121        this.msg.msg_namelen = this.addr.len() as _;
1122        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1123        this.msg.msg_iovlen = this.slices.len() as _;
1124    }
1125
1126    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1127        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1128    }
1129}
1130
1131impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1132    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1133        self.as_mut().set_msg();
1134        let fd = self.fd.as_fd().as_raw_fd();
1135        syscall!(self.as_mut().call(), wait_writable(fd))
1136    }
1137
1138    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1139        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1140    }
1141
1142    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1143        syscall!(break self.as_mut().call())
1144    }
1145}
1146
1147impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1148    type Inner = T;
1149
1150    fn into_inner(self) -> Self::Inner {
1151        self.buffer
1152    }
1153}
1154
1155impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1156    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1157        let this = self.project();
1158        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1159    }
1160}
1161
1162impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1163    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1164        self.as_mut().set_msg();
1165        let fd = self.fd.as_fd().as_raw_fd();
1166        syscall!(self.as_mut().call(), wait_readable(fd))
1167    }
1168
1169    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1170        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1171    }
1172
1173    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1174        syscall!(break self.as_mut().call())
1175    }
1176}
1177
1178impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1179    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1180        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1181    }
1182}
1183
1184impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1185    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1186        self.as_mut().set_msg();
1187        let fd = self.fd.as_fd().as_raw_fd();
1188        syscall!(self.as_mut().call(), wait_writable(fd))
1189    }
1190
1191    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1192        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1193    }
1194
1195    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1196        syscall!(break self.as_mut().call())
1197    }
1198}
1199
1200impl<S: AsFd> OpCode for PollOnce<S> {
1201    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1202        Ok(Decision::wait_for(
1203            self.fd.as_fd().as_raw_fd(),
1204            self.interest,
1205        ))
1206    }
1207
1208    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1209        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1210    }
1211
1212    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1213        Poll::Ready(Ok(0))
1214    }
1215}
1216
1217#[cfg(linux_all)]
1218impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1219    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1220        use super::WaitArg;
1221
1222        Ok(Decision::wait_for_many([
1223            WaitArg::readable(self.fd_in.as_fd().as_raw_fd()),
1224            WaitArg::writable(self.fd_out.as_fd().as_raw_fd()),
1225        ]))
1226    }
1227
1228    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1229        Some(OpType::multi_fd([
1230            self.fd_in.as_fd().as_raw_fd(),
1231            self.fd_out.as_fd().as_raw_fd(),
1232        ]))
1233    }
1234
1235    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1236        let mut offset_in = self.offset_in;
1237        let mut offset_out = self.offset_out;
1238        let offset_in_ptr = if offset_in < 0 {
1239            std::ptr::null_mut()
1240        } else {
1241            &mut offset_in
1242        };
1243        let offset_out_ptr = if offset_out < 0 {
1244            std::ptr::null_mut()
1245        } else {
1246            &mut offset_out
1247        };
1248        // We don't wait for `fd_out` here. It's users' responsibility to ensure it's
1249        // writable.
1250        Poll::Ready(Ok(syscall!(libc::splice(
1251            self.fd_in.as_fd().as_raw_fd(),
1252            offset_in_ptr,
1253            self.fd_out.as_fd().as_raw_fd(),
1254            offset_out_ptr,
1255            self.len,
1256            self.flags as _,
1257        ))? as _))
1258    }
1259}