compio_driver/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{
13    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
14};
15#[cfg(not(gnulinux))]
16use libc::open;
17#[cfg(gnulinux)]
18use libc::open64 as open;
19#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
20use libc::{pread, preadv, pwrite, pwritev};
21#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
22use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
23use socket2::{SockAddr, Socket as Socket2};
24
25use super::{AsFd, Decision, OpCode, OpType, sockaddr_storage, socklen_t, syscall};
26use crate::op::*;
27pub use crate::unix::op::*;
28
29impl<
30    D: std::marker::Send + 'static,
31    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
32> OpCode for Asyncify<F, D>
33{
34    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
35        Ok(Decision::Blocking)
36    }
37
38    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
39        // Safety: self won't be moved
40        let this = unsafe { self.get_unchecked_mut() };
41        let f = this
42            .f
43            .take()
44            .expect("the operate method could only be called once");
45        let BufResult(res, data) = f();
46        this.data = Some(data);
47        Poll::Ready(res)
48    }
49}
50
51impl OpCode for OpenFile {
52    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
53        Ok(Decision::Blocking)
54    }
55
56    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
57        Poll::Ready(Ok(syscall!(open(
58            self.path.as_ptr(),
59            self.flags | libc::O_CLOEXEC,
60            self.mode as libc::c_int
61        ))? as _))
62    }
63}
64
65impl OpCode for CloseFile {
66    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
67        Ok(Decision::Blocking)
68    }
69
70    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
71        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
72    }
73}
74
75/// Get metadata of an opened file.
76pub struct FileStat<S> {
77    pub(crate) fd: S,
78    pub(crate) stat: libc::stat,
79}
80
81impl<S> FileStat<S> {
82    /// Create [`FileStat`].
83    pub fn new(fd: S) -> Self {
84        Self {
85            fd,
86            stat: unsafe { std::mem::zeroed() },
87        }
88    }
89}
90
91impl<S: AsFd> OpCode for FileStat<S> {
92    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
93        Ok(Decision::Blocking)
94    }
95
96    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
97        let this = unsafe { self.get_unchecked_mut() };
98        #[cfg(gnulinux)]
99        {
100            let mut s: libc::statx = unsafe { std::mem::zeroed() };
101            static EMPTY_NAME: &[u8] = b"\0";
102            syscall!(libc::statx(
103                this.fd.as_fd().as_raw_fd(),
104                EMPTY_NAME.as_ptr().cast(),
105                libc::AT_EMPTY_PATH,
106                0,
107                &mut s
108            ))?;
109            this.stat = statx_to_stat(s);
110            Poll::Ready(Ok(0))
111        }
112        #[cfg(not(gnulinux))]
113        {
114            Poll::Ready(Ok(
115                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut this.stat))? as _,
116            ))
117        }
118    }
119}
120
121impl<S> IntoInner for FileStat<S> {
122    type Inner = libc::stat;
123
124    fn into_inner(self) -> Self::Inner {
125        self.stat
126    }
127}
128
129/// Get metadata from path.
130pub struct PathStat {
131    pub(crate) path: CString,
132    pub(crate) stat: libc::stat,
133    pub(crate) follow_symlink: bool,
134}
135
136impl PathStat {
137    /// Create [`PathStat`].
138    pub fn new(path: CString, follow_symlink: bool) -> Self {
139        Self {
140            path,
141            stat: unsafe { std::mem::zeroed() },
142            follow_symlink,
143        }
144    }
145}
146
147impl OpCode for PathStat {
148    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
149        Ok(Decision::Blocking)
150    }
151
152    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
153        #[cfg(gnulinux)]
154        {
155            let mut flags = libc::AT_EMPTY_PATH;
156            if !self.follow_symlink {
157                flags |= libc::AT_SYMLINK_NOFOLLOW;
158            }
159            let mut s: libc::statx = unsafe { std::mem::zeroed() };
160            syscall!(libc::statx(
161                libc::AT_FDCWD,
162                self.path.as_ptr(),
163                flags,
164                0,
165                &mut s
166            ))?;
167            self.stat = statx_to_stat(s);
168            Poll::Ready(Ok(0))
169        }
170        #[cfg(not(gnulinux))]
171        {
172            let f = if self.follow_symlink {
173                libc::stat
174            } else {
175                libc::lstat
176            };
177            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
178        }
179    }
180}
181
182impl IntoInner for PathStat {
183    type Inner = libc::stat;
184
185    fn into_inner(self) -> Self::Inner {
186        self.stat
187    }
188}
189
190impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
191    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
192        #[cfg(aio)]
193        {
194            let this = unsafe { self.get_unchecked_mut() };
195            let slice = this.buffer.as_mut_slice();
196
197            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
198            this.aiocb.aio_offset = this.offset as _;
199            this.aiocb.aio_buf = slice.as_mut_ptr().cast();
200            this.aiocb.aio_nbytes = slice.len();
201
202            Ok(Decision::aio(&mut this.aiocb, libc::aio_read))
203        }
204        #[cfg(not(aio))]
205        {
206            Ok(Decision::Blocking)
207        }
208    }
209
210    #[cfg(aio)]
211    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
212        Some(OpType::Aio(NonNull::from(
213            &mut unsafe { self.get_unchecked_mut() }.aiocb,
214        )))
215    }
216
217    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
218        let fd = self.fd.as_fd().as_raw_fd();
219        let offset = self.offset;
220        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
221        syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, offset as _,))
222    }
223}
224
225impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
226    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
227        #[cfg(freebsd)]
228        {
229            let this = unsafe { self.get_unchecked_mut() };
230            this.slices = unsafe { this.buffer.io_slices_mut() };
231
232            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
233            this.aiocb.aio_offset = this.offset as _;
234            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
235            this.aiocb.aio_nbytes = this.slices.len();
236
237            Ok(Decision::aio(&mut this.aiocb, libc::aio_readv))
238        }
239        #[cfg(not(freebsd))]
240        {
241            Ok(Decision::Blocking)
242        }
243    }
244
245    #[cfg(freebsd)]
246    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
247        Some(OpType::Aio(NonNull::from(
248            &mut unsafe { self.get_unchecked_mut() }.aiocb,
249        )))
250    }
251
252    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
253        let this = unsafe { self.get_unchecked_mut() };
254        this.slices = unsafe { this.buffer.io_slices_mut() };
255        syscall!(
256            break preadv(
257                this.fd.as_fd().as_raw_fd(),
258                this.slices.as_ptr() as _,
259                this.slices.len() as _,
260                this.offset as _,
261            )
262        )
263    }
264}
265
266impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
267    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
268        #[cfg(aio)]
269        {
270            let this = unsafe { self.get_unchecked_mut() };
271            let slice = this.buffer.as_slice();
272
273            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
274            this.aiocb.aio_offset = this.offset as _;
275            this.aiocb.aio_buf = slice.as_ptr().cast_mut().cast();
276            this.aiocb.aio_nbytes = slice.len();
277
278            Ok(Decision::aio(&mut this.aiocb, libc::aio_write))
279        }
280        #[cfg(not(aio))]
281        {
282            Ok(Decision::Blocking)
283        }
284    }
285
286    #[cfg(aio)]
287    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
288        Some(OpType::Aio(NonNull::from(
289            &mut unsafe { self.get_unchecked_mut() }.aiocb,
290        )))
291    }
292
293    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
294        let slice = self.buffer.as_slice();
295        syscall!(
296            break pwrite(
297                self.fd.as_fd().as_raw_fd(),
298                slice.as_ptr() as _,
299                slice.len() as _,
300                self.offset as _,
301            )
302        )
303    }
304}
305
306impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
307    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
308        #[cfg(freebsd)]
309        {
310            let this = unsafe { self.get_unchecked_mut() };
311            this.slices = unsafe { this.buffer.io_slices() };
312
313            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
314            this.aiocb.aio_offset = this.offset as _;
315            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
316            this.aiocb.aio_nbytes = this.slices.len();
317
318            Ok(Decision::aio(&mut this.aiocb, libc::aio_writev))
319        }
320        #[cfg(not(freebsd))]
321        {
322            Ok(Decision::Blocking)
323        }
324    }
325
326    #[cfg(freebsd)]
327    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
328        Some(OpType::Aio(NonNull::from(
329            &mut unsafe { self.get_unchecked_mut() }.aiocb,
330        )))
331    }
332
333    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
334        let this = unsafe { self.get_unchecked_mut() };
335        this.slices = unsafe { this.buffer.io_slices() };
336        syscall!(
337            break pwritev(
338                this.fd.as_fd().as_raw_fd(),
339                this.slices.as_ptr() as _,
340                this.slices.len() as _,
341                this.offset as _,
342            )
343        )
344    }
345}
346
347impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
348    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
349        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
350    }
351
352    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
353        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
354    }
355
356    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
357        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
358    }
359}
360
361impl<S: AsFd> OpCode for Sync<S> {
362    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
363        #[cfg(aio)]
364        {
365            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
366                libc::aio_fsync(libc::O_SYNC, aiocbp)
367            }
368            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
369                libc::aio_fsync(libc::O_DSYNC, aiocbp)
370            }
371
372            let this = unsafe { self.get_unchecked_mut() };
373            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
374
375            let f = if this.datasync {
376                aio_fdatasync
377            } else {
378                aio_fsync
379            };
380
381            Ok(Decision::aio(&mut this.aiocb, f))
382        }
383        #[cfg(not(aio))]
384        {
385            Ok(Decision::Blocking)
386        }
387    }
388
389    #[cfg(aio)]
390    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
391        Some(OpType::Aio(NonNull::from(
392            &mut unsafe { self.get_unchecked_mut() }.aiocb,
393        )))
394    }
395
396    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
397        #[cfg(datasync)]
398        {
399            Poll::Ready(Ok(syscall!(if self.datasync {
400                libc::fdatasync(self.fd.as_fd().as_raw_fd())
401            } else {
402                libc::fsync(self.fd.as_fd().as_raw_fd())
403            })? as _))
404        }
405        #[cfg(not(datasync))]
406        {
407            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
408        }
409    }
410}
411
412impl OpCode for Unlink {
413    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
414        Ok(Decision::Blocking)
415    }
416
417    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
418        if self.dir {
419            syscall!(libc::rmdir(self.path.as_ptr()))?;
420        } else {
421            syscall!(libc::unlink(self.path.as_ptr()))?;
422        }
423        Poll::Ready(Ok(0))
424    }
425}
426
427impl OpCode for CreateDir {
428    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
429        Ok(Decision::Blocking)
430    }
431
432    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
433        syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
434        Poll::Ready(Ok(0))
435    }
436}
437
438impl OpCode for Rename {
439    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
440        Ok(Decision::Blocking)
441    }
442
443    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
444        syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
445        Poll::Ready(Ok(0))
446    }
447}
448
449impl OpCode for Symlink {
450    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
451        Ok(Decision::Blocking)
452    }
453
454    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
455        syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
456        Poll::Ready(Ok(0))
457    }
458}
459
460impl OpCode for HardLink {
461    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
462        Ok(Decision::Blocking)
463    }
464
465    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
466        syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
467        Poll::Ready(Ok(0))
468    }
469}
470
471impl CreateSocket {
472    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
473        #[allow(unused_mut)]
474        let mut ty: i32 = self.socket_type;
475        #[cfg(any(
476            target_os = "android",
477            target_os = "dragonfly",
478            target_os = "freebsd",
479            target_os = "fuchsia",
480            target_os = "hurd",
481            target_os = "illumos",
482            target_os = "linux",
483            target_os = "netbsd",
484            target_os = "openbsd",
485            target_os = "cygwin",
486        ))]
487        {
488            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
489        }
490        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
491        let socket = Socket2::from_raw_fd(fd);
492        #[cfg(not(any(
493            target_os = "android",
494            target_os = "dragonfly",
495            target_os = "freebsd",
496            target_os = "fuchsia",
497            target_os = "hurd",
498            target_os = "illumos",
499            target_os = "linux",
500            target_os = "netbsd",
501            target_os = "openbsd",
502            target_os = "espidf",
503            target_os = "vita",
504            target_os = "cygwin",
505        )))]
506        socket.set_cloexec(true)?;
507        #[cfg(any(
508            target_os = "ios",
509            target_os = "macos",
510            target_os = "tvos",
511            target_os = "watchos",
512        ))]
513        socket.set_nosigpipe(true)?;
514        #[cfg(not(any(
515            target_os = "android",
516            target_os = "dragonfly",
517            target_os = "freebsd",
518            target_os = "fuchsia",
519            target_os = "hurd",
520            target_os = "illumos",
521            target_os = "linux",
522            target_os = "netbsd",
523            target_os = "openbsd",
524            target_os = "cygwin",
525        )))]
526        socket.set_nonblocking(true)?;
527        Ok(socket.into_raw_fd())
528    }
529}
530
531impl OpCode for CreateSocket {
532    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
533        Ok(Decision::Blocking)
534    }
535
536    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
537        Poll::Ready(Ok(unsafe { self.call()? } as _))
538    }
539}
540
541impl<S: AsFd> OpCode for ShutdownSocket<S> {
542    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
543        Ok(Decision::Blocking)
544    }
545
546    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
547        Poll::Ready(Ok(
548            syscall!(libc::shutdown(self.fd.as_fd().as_raw_fd(), self.how()))? as _,
549        ))
550    }
551}
552
553impl OpCode for CloseSocket {
554    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
555        Ok(Decision::Blocking)
556    }
557
558    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
559        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
560    }
561}
562
563impl<S: AsFd> Accept<S> {
564    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
565        let this = self.get_unchecked_mut();
566        #[cfg(any(
567            target_os = "android",
568            target_os = "dragonfly",
569            target_os = "freebsd",
570            target_os = "fuchsia",
571            target_os = "illumos",
572            target_os = "linux",
573            target_os = "netbsd",
574            target_os = "openbsd",
575            target_os = "cygwin",
576        ))]
577        {
578            libc::accept4(
579                this.fd.as_fd().as_raw_fd(),
580                &mut this.buffer as *mut _ as *mut _,
581                &mut this.addr_len,
582                libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
583            )
584        }
585        #[cfg(not(any(
586            target_os = "android",
587            target_os = "dragonfly",
588            target_os = "freebsd",
589            target_os = "fuchsia",
590            target_os = "illumos",
591            target_os = "linux",
592            target_os = "netbsd",
593            target_os = "openbsd",
594            target_os = "cygwin",
595        )))]
596        {
597            || -> io::Result<libc::c_int> {
598                let fd = syscall!(libc::accept(
599                    this.fd.as_fd().as_raw_fd(),
600                    &mut this.buffer as *mut _ as *mut _,
601                    &mut this.addr_len,
602                ))?;
603                let socket = Socket2::from_raw_fd(fd);
604                socket.set_cloexec(true)?;
605                socket.set_nonblocking(true)?;
606                Ok(socket.into_raw_fd())
607            }()
608            .unwrap_or(-1)
609        }
610    }
611}
612
613impl<S: AsFd> OpCode for Accept<S> {
614    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
615        let fd = self.fd.as_fd().as_raw_fd();
616        syscall!(self.as_mut().call(), wait_readable(fd))
617    }
618
619    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
620        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
621    }
622
623    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
624        let res = syscall!(break self.as_mut().call());
625        if let Poll::Ready(Ok(fd)) = res {
626            unsafe {
627                self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
628            }
629        }
630        res
631    }
632}
633
634impl<S: AsFd> OpCode for Connect<S> {
635    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
636        syscall!(
637            libc::connect(
638                self.fd.as_fd().as_raw_fd(),
639                self.addr.as_ptr(),
640                self.addr.len()
641            ),
642            wait_writable(self.fd.as_fd().as_raw_fd())
643        )
644    }
645
646    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
647        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
648    }
649
650    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
651        let mut err: libc::c_int = 0;
652        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
653
654        syscall!(libc::getsockopt(
655            self.fd.as_fd().as_raw_fd(),
656            libc::SOL_SOCKET,
657            libc::SO_ERROR,
658            &mut err as *mut _ as *mut _,
659            &mut err_len
660        ))?;
661
662        let res = if err == 0 {
663            Ok(0)
664        } else {
665            Err(io::Error::from_raw_os_error(err))
666        };
667        Poll::Ready(res)
668    }
669}
670
671impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
672    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
673        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
674    }
675
676    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
677        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
678    }
679
680    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
681        let fd = self.fd.as_fd().as_raw_fd();
682        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
683        syscall!(break libc::read(fd, slice.as_mut_ptr() as _, slice.len()))
684    }
685}
686
687impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
688    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
689        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
690    }
691
692    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
693        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
694    }
695
696    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
697        let this = unsafe { self.get_unchecked_mut() };
698        this.slices = unsafe { this.buffer.io_slices_mut() };
699        syscall!(
700            break libc::readv(
701                this.fd.as_fd().as_raw_fd(),
702                this.slices.as_ptr() as _,
703                this.slices.len() as _
704            )
705        )
706    }
707}
708
709impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
710    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
711        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
712    }
713
714    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
715        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
716    }
717
718    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
719        let slice = self.buffer.as_slice();
720        syscall!(
721            break libc::write(
722                self.fd.as_fd().as_raw_fd(),
723                slice.as_ptr() as _,
724                slice.len()
725            )
726        )
727    }
728}
729
730impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
731    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
732        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
733    }
734
735    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
736        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
737    }
738
739    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
740        let this = unsafe { self.get_unchecked_mut() };
741        this.slices = unsafe { this.buffer.io_slices() };
742        syscall!(
743            break libc::writev(
744                this.fd.as_fd().as_raw_fd(),
745                this.slices.as_ptr() as _,
746                this.slices.len() as _
747            )
748        )
749    }
750}
751
752impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
753    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
754        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
755    }
756
757    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
758        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
759    }
760
761    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
762        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
763    }
764}
765
766/// Receive data and source address.
767pub struct RecvFrom<T: IoBufMut, S> {
768    pub(crate) fd: S,
769    pub(crate) buffer: T,
770    pub(crate) addr: sockaddr_storage,
771    pub(crate) addr_len: socklen_t,
772    _p: PhantomPinned,
773}
774
775impl<T: IoBufMut, S> RecvFrom<T, S> {
776    /// Create [`RecvFrom`].
777    pub fn new(fd: S, buffer: T) -> Self {
778        Self {
779            fd,
780            buffer,
781            addr: unsafe { std::mem::zeroed() },
782            addr_len: std::mem::size_of::<sockaddr_storage>() as _,
783            _p: PhantomPinned,
784        }
785    }
786}
787
788impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
789    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
790        let this = self.get_unchecked_mut();
791        let fd = this.fd.as_fd().as_raw_fd();
792        let slice = this.buffer.as_mut_slice();
793        libc::recvfrom(
794            fd,
795            slice.as_mut_ptr() as _,
796            slice.len(),
797            0,
798            &mut this.addr as *mut _ as _,
799            &mut this.addr_len,
800        )
801    }
802}
803
804impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
805    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
806        let fd = self.fd.as_fd().as_raw_fd();
807        syscall!(self.as_mut().call(), wait_readable(fd))
808    }
809
810    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
811        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
812    }
813
814    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
815        syscall!(break self.as_mut().call())
816    }
817}
818
819impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
820    type Inner = (T, sockaddr_storage, socklen_t);
821
822    fn into_inner(self) -> Self::Inner {
823        (self.buffer, self.addr, self.addr_len)
824    }
825}
826
827/// Receive data and source address into vectored buffer.
828pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
829    pub(crate) fd: S,
830    pub(crate) buffer: T,
831    pub(crate) slices: Vec<IoSliceMut>,
832    pub(crate) addr: sockaddr_storage,
833    pub(crate) msg: libc::msghdr,
834    _p: PhantomPinned,
835}
836
837impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
838    /// Create [`RecvFromVectored`].
839    pub fn new(fd: S, buffer: T) -> Self {
840        Self {
841            fd,
842            buffer,
843            slices: vec![],
844            addr: unsafe { std::mem::zeroed() },
845            msg: unsafe { std::mem::zeroed() },
846            _p: PhantomPinned,
847        }
848    }
849}
850
851impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
852    fn set_msg(&mut self) {
853        self.slices = unsafe { self.buffer.io_slices_mut() };
854        self.msg = libc::msghdr {
855            msg_name: &mut self.addr as *mut _ as _,
856            msg_namelen: std::mem::size_of_val(&self.addr) as _,
857            msg_iov: self.slices.as_mut_ptr() as _,
858            msg_iovlen: self.slices.len() as _,
859            msg_control: std::ptr::null_mut(),
860            msg_controllen: 0,
861            msg_flags: 0,
862        };
863    }
864
865    unsafe fn call(&mut self) -> libc::ssize_t {
866        libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0)
867    }
868}
869
870impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
871    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
872        let this = unsafe { self.get_unchecked_mut() };
873        this.set_msg();
874        syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
875    }
876
877    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
878        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
879    }
880
881    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
882        let this = unsafe { self.get_unchecked_mut() };
883        syscall!(break this.call())
884    }
885}
886
887impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
888    type Inner = (T, sockaddr_storage, socklen_t);
889
890    fn into_inner(self) -> Self::Inner {
891        (self.buffer, self.addr, self.msg.msg_namelen)
892    }
893}
894
895/// Send data to specified address.
896pub struct SendTo<T: IoBuf, S> {
897    pub(crate) fd: S,
898    pub(crate) buffer: T,
899    pub(crate) addr: SockAddr,
900    _p: PhantomPinned,
901}
902
903impl<T: IoBuf, S> SendTo<T, S> {
904    /// Create [`SendTo`].
905    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
906        Self {
907            fd,
908            buffer,
909            addr,
910            _p: PhantomPinned,
911        }
912    }
913}
914
915impl<T: IoBuf, S: AsFd> SendTo<T, S> {
916    unsafe fn call(&self) -> libc::ssize_t {
917        let slice = self.buffer.as_slice();
918        libc::sendto(
919            self.fd.as_fd().as_raw_fd(),
920            slice.as_ptr() as _,
921            slice.len(),
922            0,
923            self.addr.as_ptr(),
924            self.addr.len(),
925        )
926    }
927}
928
929impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
930    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
931        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
932    }
933
934    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
935        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
936    }
937
938    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
939        syscall!(break self.call())
940    }
941}
942
943impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
944    type Inner = T;
945
946    fn into_inner(self) -> Self::Inner {
947        self.buffer
948    }
949}
950
951/// Send data to specified address from vectored buffer.
952pub struct SendToVectored<T: IoVectoredBuf, S> {
953    pub(crate) fd: S,
954    pub(crate) buffer: T,
955    pub(crate) addr: SockAddr,
956    pub(crate) slices: Vec<IoSlice>,
957    pub(crate) msg: libc::msghdr,
958    _p: PhantomPinned,
959}
960
961impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
962    /// Create [`SendToVectored`].
963    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
964        Self {
965            fd,
966            buffer,
967            addr,
968            slices: vec![],
969            msg: unsafe { std::mem::zeroed() },
970            _p: PhantomPinned,
971        }
972    }
973}
974
975impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
976    fn set_msg(&mut self) {
977        self.slices = unsafe { self.buffer.io_slices() };
978        self.msg = libc::msghdr {
979            msg_name: &mut self.addr as *mut _ as _,
980            msg_namelen: std::mem::size_of_val(&self.addr) as _,
981            msg_iov: self.slices.as_mut_ptr() as _,
982            msg_iovlen: self.slices.len() as _,
983            msg_control: std::ptr::null_mut(),
984            msg_controllen: 0,
985            msg_flags: 0,
986        };
987    }
988
989    unsafe fn call(&self) -> libc::ssize_t {
990        libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0)
991    }
992}
993
994impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
995    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
996        let this = unsafe { self.get_unchecked_mut() };
997        this.set_msg();
998        syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
999    }
1000
1001    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1002        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1003    }
1004
1005    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1006        syscall!(break self.call())
1007    }
1008}
1009
1010impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1011    type Inner = T;
1012
1013    fn into_inner(self) -> Self::Inner {
1014        self.buffer
1015    }
1016}
1017
1018impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1019    unsafe fn call(&mut self) -> libc::ssize_t {
1020        libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0)
1021    }
1022}
1023
1024impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1025    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1026        let this = unsafe { self.get_unchecked_mut() };
1027        unsafe { this.set_msg() };
1028        syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
1029    }
1030
1031    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1032        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1033    }
1034
1035    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1036        let this = unsafe { self.get_unchecked_mut() };
1037        syscall!(break this.call())
1038    }
1039}
1040
1041impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1042    unsafe fn call(&self) -> libc::ssize_t {
1043        libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0)
1044    }
1045}
1046
1047impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1048    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1049        let this = unsafe { self.get_unchecked_mut() };
1050        unsafe { this.set_msg() };
1051        syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1052    }
1053
1054    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1055        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1056    }
1057
1058    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1059        syscall!(break self.call())
1060    }
1061}
1062
1063impl<S: AsFd> OpCode for PollOnce<S> {
1064    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1065        Ok(Decision::wait_for(
1066            self.fd.as_fd().as_raw_fd(),
1067            self.interest,
1068        ))
1069    }
1070
1071    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1072        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1073    }
1074
1075    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1076        Poll::Ready(Ok(0))
1077    }
1078}