Skip to main content

compio_driver/sys/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
13#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
14use libc::{pread, preadv, pwrite, pwritev};
15#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
16use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
17use pin_project_lite::pin_project;
18use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
19
20use super::{AsFd, Decision, OpCode, OpType, syscall};
21pub use crate::sys::unix_op::*;
22use crate::{op::*, sys_slice::*};
23
24unsafe impl<
25    D: std::marker::Send + 'static,
26    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
27> OpCode for Asyncify<F, D>
28{
29    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
30        Ok(Decision::Blocking)
31    }
32
33    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
34        let this = self.project();
35        let f = this
36            .f
37            .take()
38            .expect("the operate method could only be called once");
39        let BufResult(res, data) = f();
40        *this.data = Some(data);
41        Poll::Ready(res)
42    }
43}
44
45unsafe impl<
46    S,
47    D: std::marker::Send + 'static,
48    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
49> OpCode for AsyncifyFd<S, F, D>
50{
51    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
52        Ok(Decision::Blocking)
53    }
54
55    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
56        let this = self.project();
57        let f = this
58            .f
59            .take()
60            .expect("the operate method could only be called once");
61        let BufResult(res, data) = f(this.fd);
62        *this.data = Some(data);
63        Poll::Ready(res)
64    }
65}
66
67unsafe impl OpCode for OpenFile {
68    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
69        Ok(Decision::Blocking)
70    }
71
72    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
73        Poll::Ready(self.call())
74    }
75}
76
77unsafe impl OpCode for CloseFile {
78    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
79        Ok(Decision::Blocking)
80    }
81
82    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
83        Poll::Ready(self.call())
84    }
85}
86
87unsafe impl<S: AsFd> OpCode for TruncateFile<S> {
88    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
89        Ok(Decision::Blocking)
90    }
91
92    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
93        Poll::Ready(self.call())
94    }
95}
96
97pin_project! {
98    /// Get metadata of an opened file.
99    pub struct FileStat<S> {
100        pub(crate) fd: S,
101        pub(crate) stat: Stat,
102    }
103}
104
105impl<S> FileStat<S> {
106    /// Create [`FileStat`].
107    pub fn new(fd: S) -> Self {
108        Self {
109            fd,
110            stat: unsafe { std::mem::zeroed() },
111        }
112    }
113}
114
115unsafe impl<S: AsFd> OpCode for FileStat<S> {
116    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
117        Ok(Decision::Blocking)
118    }
119
120    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
121        let this = self.project();
122        #[cfg(gnulinux)]
123        {
124            let mut s: libc::statx = unsafe { std::mem::zeroed() };
125            static EMPTY_NAME: &[u8] = b"\0";
126            syscall!(libc::statx(
127                this.fd.as_fd().as_raw_fd(),
128                EMPTY_NAME.as_ptr().cast(),
129                libc::AT_EMPTY_PATH,
130                statx_mask(),
131                &mut s
132            ))?;
133            *this.stat = statx_to_stat(s);
134            Poll::Ready(Ok(0))
135        }
136        #[cfg(not(gnulinux))]
137        {
138            Poll::Ready(Ok(
139                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), this.stat))? as _,
140            ))
141        }
142    }
143}
144
145impl<S> IntoInner for FileStat<S> {
146    type Inner = Stat;
147
148    fn into_inner(self) -> Self::Inner {
149        self.stat
150    }
151}
152
153/// Get metadata from path.
154pub struct PathStat {
155    pub(crate) path: CString,
156    pub(crate) stat: Stat,
157    pub(crate) follow_symlink: bool,
158}
159
160impl PathStat {
161    /// Create [`PathStat`].
162    pub fn new(path: CString, follow_symlink: bool) -> Self {
163        Self {
164            path,
165            stat: unsafe { std::mem::zeroed() },
166            follow_symlink,
167        }
168    }
169}
170
171unsafe impl OpCode for PathStat {
172    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
173        Ok(Decision::Blocking)
174    }
175
176    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
177        #[cfg(gnulinux)]
178        {
179            let mut flags = libc::AT_EMPTY_PATH;
180            if !self.follow_symlink {
181                flags |= libc::AT_SYMLINK_NOFOLLOW;
182            }
183            let mut s: libc::statx = unsafe { std::mem::zeroed() };
184            syscall!(libc::statx(
185                libc::AT_FDCWD,
186                self.path.as_ptr(),
187                flags,
188                statx_mask(),
189                &mut s
190            ))?;
191            self.stat = statx_to_stat(s);
192            Poll::Ready(Ok(0))
193        }
194        #[cfg(not(gnulinux))]
195        {
196            let f = if self.follow_symlink {
197                libc::stat
198            } else {
199                libc::lstat
200            };
201            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
202        }
203    }
204}
205
206impl IntoInner for PathStat {
207    type Inner = Stat;
208
209    fn into_inner(self) -> Self::Inner {
210        self.stat
211    }
212}
213
214unsafe impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
215    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
216        #[cfg(aio)]
217        {
218            let this = self.project();
219            let slice = this.buffer.sys_slice_mut();
220
221            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
222            this.aiocb.aio_offset = *this.offset as _;
223            this.aiocb.aio_buf = slice.ptr().cast();
224            this.aiocb.aio_nbytes = slice.len();
225
226            Ok(Decision::aio(this.aiocb, libc::aio_read))
227        }
228        #[cfg(not(aio))]
229        {
230            Ok(Decision::Blocking)
231        }
232    }
233
234    #[cfg(aio)]
235    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
236        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
237    }
238
239    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
240        let fd = self.fd.as_fd().as_raw_fd();
241        let offset = self.offset;
242        let slice = self.project().buffer.sys_slice_mut();
243        syscall!(break pread(fd, slice.ptr() as _, slice.len() as _, offset as _,))
244    }
245}
246
247unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
248    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
249        #[cfg(freebsd)]
250        {
251            let this = self.project();
252            *this.slices = this.buffer.sys_slices_mut();
253
254            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
255            this.aiocb.aio_offset = *this.offset as _;
256            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
257            this.aiocb.aio_nbytes = this.slices.len();
258
259            Ok(Decision::aio(this.aiocb, libc::aio_readv))
260        }
261        #[cfg(not(freebsd))]
262        {
263            Ok(Decision::Blocking)
264        }
265    }
266
267    #[cfg(freebsd)]
268    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
269        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
270    }
271
272    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
273        let this = self.project();
274        *this.slices = this.buffer.sys_slices_mut();
275        syscall!(
276            break preadv(
277                this.fd.as_fd().as_raw_fd(),
278                this.slices.as_ptr() as _,
279                this.slices.len() as _,
280                *this.offset as _,
281            )
282        )
283    }
284}
285
286unsafe impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
287    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
288        #[cfg(aio)]
289        {
290            let this = self.project();
291            let slice = this.buffer.as_ref().sys_slice();
292
293            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
294            this.aiocb.aio_offset = *this.offset as _;
295            this.aiocb.aio_buf = slice.ptr().cast();
296            this.aiocb.aio_nbytes = slice.len();
297
298            Ok(Decision::aio(this.aiocb, libc::aio_write))
299        }
300        #[cfg(not(aio))]
301        {
302            Ok(Decision::Blocking)
303        }
304    }
305
306    #[cfg(aio)]
307    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
308        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
309    }
310
311    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
312        let slice = self.buffer.as_init();
313        syscall!(
314            break pwrite(
315                self.fd.as_fd().as_raw_fd(),
316                slice.as_ptr() as _,
317                slice.len() as _,
318                self.offset as _,
319            )
320        )
321    }
322}
323
324unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
325    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
326        #[cfg(freebsd)]
327        {
328            let this = self.project();
329            *this.slices = this.buffer.as_ref().sys_slices();
330
331            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
332            this.aiocb.aio_offset = *this.offset as _;
333            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
334            this.aiocb.aio_nbytes = this.slices.len();
335
336            Ok(Decision::aio(this.aiocb, libc::aio_writev))
337        }
338        #[cfg(not(freebsd))]
339        {
340            Ok(Decision::Blocking)
341        }
342    }
343
344    #[cfg(freebsd)]
345    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
346        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
347    }
348
349    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
350        let this = self.project();
351        *this.slices = this.buffer.as_ref().sys_slices();
352        syscall!(
353            break pwritev(
354                this.fd.as_fd().as_raw_fd(),
355                this.slices.as_ptr() as _,
356                this.slices.len() as _,
357                *this.offset as _,
358            )
359        )
360    }
361}
362
363unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
364    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
365        self.project().op.pre_submit()
366    }
367
368    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
369        self.project().op.op_type()
370    }
371
372    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
373        self.project().op.operate()
374    }
375}
376
377unsafe impl<T: IoBufMut, S: AsFd> OpCode for Read<T, S> {
378    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
379        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
380    }
381
382    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
383        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
384    }
385
386    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
387        let fd = self.fd.as_fd().as_raw_fd();
388        let slice = self.project().buffer.sys_slice_mut();
389        syscall!(break libc::read(fd, slice.ptr() as _, slice.len()))
390    }
391}
392
393unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectored<T, S> {
394    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
395        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
396    }
397
398    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
399        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
400    }
401
402    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
403        let this = self.project();
404        *this.slices = this.buffer.sys_slices_mut();
405        syscall!(
406            break libc::readv(
407                this.fd.as_fd().as_raw_fd(),
408                this.slices.as_ptr() as _,
409                this.slices.len() as _
410            )
411        )
412    }
413}
414
415unsafe impl<T: IoBuf, S: AsFd> OpCode for Write<T, S> {
416    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
417        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
418    }
419
420    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
421        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
422    }
423
424    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
425        let slice = self.buffer.as_init();
426        syscall!(
427            break libc::write(
428                self.fd.as_fd().as_raw_fd(),
429                slice.as_ptr() as _,
430                slice.len()
431            )
432        )
433    }
434}
435
436unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectored<T, S> {
437    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
438        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
439    }
440
441    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
442        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
443    }
444
445    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
446        let this = self.project();
447        *this.slices = this.buffer.as_ref().sys_slices();
448        syscall!(
449            break libc::writev(
450                this.fd.as_fd().as_raw_fd(),
451                this.slices.as_ptr() as _,
452                this.slices.len() as _
453            )
454        )
455    }
456}
457
458unsafe impl<S: AsFd> OpCode for crate::op::managed::ReadManaged<S> {
459    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
460        self.project().op.pre_submit()
461    }
462
463    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
464        self.project().op.op_type()
465    }
466
467    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
468        self.project().op.operate()
469    }
470}
471
472unsafe impl<S: AsFd> OpCode for Sync<S> {
473    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
474        #[cfg(aio)]
475        {
476            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
477                unsafe { libc::aio_fsync(libc::O_SYNC, aiocbp) }
478            }
479            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
480                unsafe { libc::aio_fsync(libc::O_DSYNC, aiocbp) }
481            }
482
483            let this = self.project();
484            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
485
486            let f = if *this.datasync {
487                aio_fdatasync
488            } else {
489                aio_fsync
490            };
491
492            Ok(Decision::aio(this.aiocb, f))
493        }
494        #[cfg(not(aio))]
495        {
496            Ok(Decision::Blocking)
497        }
498    }
499
500    #[cfg(aio)]
501    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
502        Some(OpType::Aio(NonNull::from(self.project().aiocb)))
503    }
504
505    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
506        #[cfg(datasync)]
507        {
508            Poll::Ready(Ok(syscall!(if self.datasync {
509                libc::fdatasync(self.fd.as_fd().as_raw_fd())
510            } else {
511                libc::fsync(self.fd.as_fd().as_raw_fd())
512            })? as _))
513        }
514        #[cfg(not(datasync))]
515        {
516            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
517        }
518    }
519}
520
521unsafe impl OpCode for Unlink {
522    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
523        Ok(Decision::Blocking)
524    }
525
526    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
527        Poll::Ready(self.call())
528    }
529}
530
531unsafe impl OpCode for CreateDir {
532    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
533        Ok(Decision::Blocking)
534    }
535
536    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
537        Poll::Ready(self.call())
538    }
539}
540
541unsafe impl OpCode for Rename {
542    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
543        Ok(Decision::Blocking)
544    }
545
546    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
547        Poll::Ready(self.call())
548    }
549}
550
551unsafe impl OpCode for Symlink {
552    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
553        Ok(Decision::Blocking)
554    }
555
556    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
557        Poll::Ready(self.call())
558    }
559}
560
561unsafe impl OpCode for HardLink {
562    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
563        Ok(Decision::Blocking)
564    }
565
566    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
567        Poll::Ready(self.call())
568    }
569}
570
571impl CreateSocket {
572    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
573        #[allow(unused_mut)]
574        let mut ty: i32 = self.socket_type;
575        #[cfg(any(
576            target_os = "android",
577            target_os = "dragonfly",
578            target_os = "freebsd",
579            target_os = "fuchsia",
580            target_os = "hurd",
581            target_os = "illumos",
582            target_os = "linux",
583            target_os = "netbsd",
584            target_os = "openbsd",
585            target_os = "cygwin",
586        ))]
587        {
588            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
589        }
590        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
591        let socket = unsafe { Socket2::from_raw_fd(fd) };
592        #[cfg(not(any(
593            target_os = "android",
594            target_os = "dragonfly",
595            target_os = "freebsd",
596            target_os = "fuchsia",
597            target_os = "hurd",
598            target_os = "illumos",
599            target_os = "linux",
600            target_os = "netbsd",
601            target_os = "openbsd",
602            target_os = "espidf",
603            target_os = "vita",
604            target_os = "cygwin",
605        )))]
606        socket.set_cloexec(true)?;
607        #[cfg(any(
608            target_os = "ios",
609            target_os = "macos",
610            target_os = "tvos",
611            target_os = "watchos",
612        ))]
613        socket.set_nosigpipe(true)?;
614        #[cfg(not(any(
615            target_os = "android",
616            target_os = "dragonfly",
617            target_os = "freebsd",
618            target_os = "fuchsia",
619            target_os = "hurd",
620            target_os = "illumos",
621            target_os = "linux",
622            target_os = "netbsd",
623            target_os = "openbsd",
624            target_os = "cygwin",
625        )))]
626        socket.set_nonblocking(true)?;
627        Ok(socket.into_raw_fd())
628    }
629}
630
631unsafe impl OpCode for CreateSocket {
632    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
633        Ok(Decision::Blocking)
634    }
635
636    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
637        Poll::Ready(Ok(unsafe { self.call()? } as _))
638    }
639}
640
641unsafe impl<S: AsFd> OpCode for ShutdownSocket<S> {
642    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
643        Ok(Decision::Blocking)
644    }
645
646    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
647        Poll::Ready(self.call())
648    }
649}
650
651unsafe impl OpCode for CloseSocket {
652    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
653        Ok(Decision::Blocking)
654    }
655
656    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
657        Poll::Ready(self.call())
658    }
659}
660
661impl<S: AsFd> Accept<S> {
662    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
663        let this = self.project();
664        #[cfg(any(
665            target_os = "android",
666            target_os = "dragonfly",
667            target_os = "freebsd",
668            target_os = "fuchsia",
669            target_os = "illumos",
670            target_os = "linux",
671            target_os = "netbsd",
672            target_os = "openbsd",
673            target_os = "cygwin",
674        ))]
675        unsafe {
676            libc::accept4(
677                this.fd.as_fd().as_raw_fd(),
678                this.buffer as *mut _ as *mut _,
679                this.addr_len,
680                libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
681            )
682        }
683        #[cfg(not(any(
684            target_os = "android",
685            target_os = "dragonfly",
686            target_os = "freebsd",
687            target_os = "fuchsia",
688            target_os = "illumos",
689            target_os = "linux",
690            target_os = "netbsd",
691            target_os = "openbsd",
692            target_os = "cygwin",
693        )))]
694        {
695            || -> io::Result<libc::c_int> {
696                let fd = syscall!(libc::accept(
697                    this.fd.as_fd().as_raw_fd(),
698                    this.buffer as *mut _ as *mut _,
699                    this.addr_len,
700                ))?;
701                let socket = unsafe { Socket2::from_raw_fd(fd) };
702                socket.set_cloexec(true)?;
703                socket.set_nonblocking(true)?;
704                Ok(socket.into_raw_fd())
705            }()
706            .unwrap_or(-1)
707        }
708    }
709}
710
711unsafe impl<S: AsFd> OpCode for Accept<S> {
712    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
713        let fd = self.fd.as_fd().as_raw_fd();
714        syscall!(self.as_mut().call(), wait_readable(fd))
715    }
716
717    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
718        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
719    }
720
721    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
722        let res = syscall!(break self.as_mut().call());
723        if let Poll::Ready(Ok(fd)) = res {
724            // Safety: we own the fd returned by accept/accept4
725            let fd = unsafe { OwnedFd::from_raw_fd(fd as _) };
726            *self.project().accepted_fd = Some(fd);
727        }
728        res
729    }
730}
731
732unsafe impl<S: AsFd> OpCode for Connect<S> {
733    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
734        syscall!(
735            libc::connect(
736                self.fd.as_fd().as_raw_fd(),
737                self.addr.as_ptr().cast(),
738                self.addr.len()
739            ),
740            wait_writable(self.fd.as_fd().as_raw_fd())
741        )
742    }
743
744    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
745        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
746    }
747
748    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
749        let mut err: libc::c_int = 0;
750        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
751
752        syscall!(libc::getsockopt(
753            self.fd.as_fd().as_raw_fd(),
754            libc::SOL_SOCKET,
755            libc::SO_ERROR,
756            &mut err as *mut _ as *mut _,
757            &mut err_len
758        ))?;
759
760        let res = if err == 0 {
761            Ok(0)
762        } else {
763            Err(io::Error::from_raw_os_error(err))
764        };
765        Poll::Ready(res)
766    }
767}
768
769unsafe impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
770    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
771        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
772    }
773
774    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
775        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
776    }
777
778    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
779        let fd = self.fd.as_fd().as_raw_fd();
780        let flags = self.flags;
781        let slice = self.project().buffer.sys_slice_mut();
782        syscall!(break libc::recv(fd, slice.ptr() as _, slice.len(), flags))
783    }
784}
785
786unsafe impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
787    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
788        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
789    }
790
791    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
792        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
793    }
794
795    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
796        let slice = self.buffer.as_init();
797        syscall!(
798            break libc::send(
799                self.fd.as_fd().as_raw_fd(),
800                slice.as_ptr() as _,
801                slice.len(),
802                self.flags,
803            )
804        )
805    }
806}
807
808unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
809    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
810        self.project().op.pre_submit()
811    }
812
813    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
814        self.project().op.op_type()
815    }
816
817    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
818        self.project().op.operate()
819    }
820}
821
822unsafe impl<S: AsFd> OpCode for crate::op::managed::RecvFromManaged<S> {
823    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
824        self.project().op.pre_submit()
825    }
826
827    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
828        self.project().op.op_type()
829    }
830
831    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
832        self.project().op.operate()
833    }
834}
835
836impl<T: IoVectoredBufMut, S: AsFd> RecvVectored<T, S> {
837    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
838        let this = self.project();
839        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
840    }
841}
842
843unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
844    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
845        self.as_mut().set_msg();
846        let fd = self.fd.as_fd().as_raw_fd();
847        syscall!(self.as_mut().call(), wait_readable(fd))
848    }
849
850    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
851        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
852    }
853
854    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
855        syscall!(break self.as_mut().call())
856    }
857}
858
859impl<T: IoVectoredBuf, S: AsFd> SendVectored<T, S> {
860    unsafe fn call(&self) -> libc::ssize_t {
861        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
862    }
863}
864
865unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
866    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
867        self.as_mut().set_msg();
868        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
869        syscall!(self.as_mut().call(), wait_writable(fd))
870    }
871
872    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
873        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
874    }
875
876    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
877        syscall!(break self.call())
878    }
879}
880
881pin_project! {
882    /// Receive data and source address.
883    pub struct RecvFrom<T: IoBufMut, S> {
884        pub(crate) fd: S,
885        #[pin]
886        pub(crate) buffer: T,
887        pub(crate) addr: SockAddrStorage,
888        pub(crate) addr_len: socklen_t,
889        pub(crate) flags: i32,
890        _p: PhantomPinned,
891    }
892}
893
894impl<T: IoBufMut, S> RecvFrom<T, S> {
895    /// Create [`RecvFrom`].
896    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
897        let addr = SockAddrStorage::zeroed();
898        let addr_len = addr.size_of();
899        Self {
900            fd,
901            buffer,
902            addr,
903            addr_len,
904            flags,
905            _p: PhantomPinned,
906        }
907    }
908}
909
910impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
911    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
912        let fd = self.fd.as_fd().as_raw_fd();
913        let this = self.project();
914        let slice = this.buffer.sys_slice_mut();
915        unsafe {
916            libc::recvfrom(
917                fd,
918                slice.ptr() as _,
919                slice.len(),
920                *this.flags,
921                this.addr as *mut _ as _,
922                this.addr_len,
923            )
924        }
925    }
926}
927
928unsafe impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
929    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
930        let fd = self.fd.as_fd().as_raw_fd();
931        syscall!(self.as_mut().call(), wait_readable(fd))
932    }
933
934    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
935        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
936    }
937
938    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
939        syscall!(break self.as_mut().call())
940    }
941}
942
943impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
944    type Inner = (T, SockAddrStorage, socklen_t);
945
946    fn into_inner(self) -> Self::Inner {
947        (self.buffer, self.addr, self.addr_len)
948    }
949}
950
951pin_project! {
952    /// Receive data and source address into vectored buffer.
953    pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
954        pub(crate) fd: S,
955        #[pin]
956        pub(crate) buffer: T,
957        pub(crate) slices: Vec<SysSlice>,
958        pub(crate) addr: SockAddrStorage,
959        pub(crate) msg: libc::msghdr,
960        pub(crate) flags: i32,
961        _p: PhantomPinned,
962    }
963}
964
965impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
966    /// Create [`RecvFromVectored`].
967    pub fn new(fd: S, buffer: T, flags: i32) -> Self {
968        Self {
969            fd,
970            buffer,
971            slices: vec![],
972            addr: SockAddrStorage::zeroed(),
973            msg: unsafe { std::mem::zeroed() },
974            flags,
975            _p: PhantomPinned,
976        }
977    }
978}
979
980impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
981    fn set_msg(self: Pin<&mut Self>) {
982        let this = self.project();
983        *this.slices = this.buffer.sys_slices_mut();
984        this.msg.msg_name = this.addr as *mut _ as _;
985        this.msg.msg_namelen = this.addr.size_of() as _;
986        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
987        this.msg.msg_iovlen = this.slices.len() as _;
988    }
989
990    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
991        let this = self.project();
992        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
993    }
994}
995
996unsafe impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
997    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
998        self.as_mut().set_msg();
999        let fd = self.as_mut().project().fd.as_fd().as_raw_fd();
1000        syscall!(self.as_mut().call(), wait_readable(fd))
1001    }
1002
1003    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1004        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1005    }
1006
1007    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1008        syscall!(break self.as_mut().call())
1009    }
1010}
1011
1012impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
1013    type Inner = (T, SockAddrStorage, socklen_t);
1014
1015    fn into_inner(self) -> Self::Inner {
1016        (self.buffer, self.addr, self.msg.msg_namelen)
1017    }
1018}
1019
1020pin_project! {
1021    /// Send data to specified address.
1022    pub struct SendTo<T: IoBuf, S> {
1023        pub(crate) fd: S,
1024        pub(crate) buffer: T,
1025        pub(crate) addr: SockAddr,
1026        flags: i32,
1027        _p: PhantomPinned,
1028    }
1029}
1030
1031impl<T: IoBuf, S> SendTo<T, S> {
1032    /// Create [`SendTo`].
1033    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1034        Self {
1035            fd,
1036            buffer,
1037            addr,
1038            flags,
1039            _p: PhantomPinned,
1040        }
1041    }
1042}
1043
1044impl<T: IoBuf, S: AsFd> SendTo<T, S> {
1045    unsafe fn call(&self) -> libc::ssize_t {
1046        let slice = self.buffer.as_init();
1047        unsafe {
1048            libc::sendto(
1049                self.fd.as_fd().as_raw_fd(),
1050                slice.as_ptr() as _,
1051                slice.len(),
1052                self.flags,
1053                self.addr.as_ptr().cast(),
1054                self.addr.len(),
1055            )
1056        }
1057    }
1058}
1059
1060unsafe impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
1061    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1062        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
1063    }
1064
1065    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1066        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1067    }
1068
1069    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1070        syscall!(break self.call())
1071    }
1072}
1073
1074impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
1075    type Inner = T;
1076
1077    fn into_inner(self) -> Self::Inner {
1078        self.buffer
1079    }
1080}
1081
1082pin_project! {
1083    /// Send data to specified address from vectored buffer.
1084    pub struct SendToVectored<T: IoVectoredBuf, S> {
1085        pub(crate) fd: S,
1086        #[pin]
1087        pub(crate) buffer: T,
1088        pub(crate) addr: SockAddr,
1089        pub(crate) slices: Vec<SysSlice>,
1090        pub(crate) msg: libc::msghdr,
1091        pub(crate) flags: i32,
1092        _p: PhantomPinned,
1093    }
1094}
1095
1096impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
1097    /// Create [`SendToVectored`].
1098    pub fn new(fd: S, buffer: T, addr: SockAddr, flags: i32) -> Self {
1099        Self {
1100            fd,
1101            buffer,
1102            addr,
1103            slices: vec![],
1104            msg: unsafe { std::mem::zeroed() },
1105            flags,
1106            _p: PhantomPinned,
1107        }
1108    }
1109}
1110
1111impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1112    fn set_msg(self: Pin<&mut Self>) {
1113        let this = self.project();
1114        *this.slices = this.buffer.as_ref().sys_slices();
1115        this.msg.msg_name = this.addr as *mut _ as _;
1116        this.msg.msg_namelen = this.addr.len() as _;
1117        this.msg.msg_iov = this.slices.as_mut_ptr() as _;
1118        this.msg.msg_iovlen = this.slices.len() as _;
1119    }
1120
1121    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1122        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1123    }
1124}
1125
1126unsafe impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1127    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1128        self.as_mut().set_msg();
1129        let fd = self.fd.as_fd().as_raw_fd();
1130        syscall!(self.as_mut().call(), wait_writable(fd))
1131    }
1132
1133    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1134        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1135    }
1136
1137    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1138        syscall!(break self.as_mut().call())
1139    }
1140}
1141
1142impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1143    type Inner = T;
1144
1145    fn into_inner(self) -> Self::Inner {
1146        self.buffer
1147    }
1148}
1149
1150impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1151    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1152        let this = self.project();
1153        unsafe { libc::recvmsg(this.fd.as_fd().as_raw_fd(), this.msg, *this.flags) }
1154    }
1155}
1156
1157unsafe impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1158    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1159        self.as_mut().set_msg();
1160        let fd = self.fd.as_fd().as_raw_fd();
1161        syscall!(self.as_mut().call(), wait_readable(fd))
1162    }
1163
1164    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1165        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1166    }
1167
1168    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1169        syscall!(break self.as_mut().call())
1170    }
1171}
1172
1173impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1174    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
1175        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, self.flags) }
1176    }
1177}
1178
1179unsafe impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1180    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
1181        self.as_mut().set_msg();
1182        let fd = self.fd.as_fd().as_raw_fd();
1183        syscall!(self.as_mut().call(), wait_writable(fd))
1184    }
1185
1186    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1187        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1188    }
1189
1190    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1191        syscall!(break self.as_mut().call())
1192    }
1193}
1194
1195unsafe impl<S: AsFd> OpCode for PollOnce<S> {
1196    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1197        Ok(Decision::wait_for(
1198            self.fd.as_fd().as_raw_fd(),
1199            self.interest,
1200        ))
1201    }
1202
1203    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1204        Some(OpType::fd(self.fd.as_fd().as_raw_fd()))
1205    }
1206
1207    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1208        Poll::Ready(Ok(0))
1209    }
1210}
1211
1212#[cfg(linux_all)]
1213unsafe impl<S1: AsFd, S2: AsFd> OpCode for Splice<S1, S2> {
1214    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1215        use super::WaitArg;
1216
1217        Ok(Decision::wait_for_many([
1218            WaitArg::readable(self.fd_in.as_fd().as_raw_fd()),
1219            WaitArg::writable(self.fd_out.as_fd().as_raw_fd()),
1220        ]))
1221    }
1222
1223    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1224        Some(OpType::multi_fd([
1225            self.fd_in.as_fd().as_raw_fd(),
1226            self.fd_out.as_fd().as_raw_fd(),
1227        ]))
1228    }
1229
1230    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1231        let mut offset_in = self.offset_in;
1232        let mut offset_out = self.offset_out;
1233        let offset_in_ptr = if offset_in < 0 {
1234            std::ptr::null_mut()
1235        } else {
1236            &mut offset_in
1237        };
1238        let offset_out_ptr = if offset_out < 0 {
1239            std::ptr::null_mut()
1240        } else {
1241            &mut offset_out
1242        };
1243        syscall!(
1244            break libc::splice(
1245                self.fd_in.as_fd().as_raw_fd(),
1246                offset_in_ptr,
1247                self.fd_out.as_fd().as_raw_fd(),
1248                offset_out_ptr,
1249                self.len,
1250                self.flags | libc::SPLICE_F_NONBLOCK,
1251            )
1252        )
1253    }
1254}