compio_driver/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{
13    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
14};
15#[cfg(not(gnulinux))]
16use libc::open;
17#[cfg(gnulinux)]
18use libc::open64 as open;
19#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
20use libc::{pread, preadv, pwrite, pwritev};
21#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
22use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
23use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
24
25use super::{AsFd, Decision, OpCode, OpType, syscall};
26use crate::op::*;
27pub use crate::unix::op::*;
28
29impl<
30    D: std::marker::Send + 'static,
31    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
32> OpCode for Asyncify<F, D>
33{
34    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
35        Ok(Decision::Blocking)
36    }
37
38    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
39        // Safety: self won't be moved
40        let this = unsafe { self.get_unchecked_mut() };
41        let f = this
42            .f
43            .take()
44            .expect("the operate method could only be called once");
45        let BufResult(res, data) = f();
46        this.data = Some(data);
47        Poll::Ready(res)
48    }
49}
50
51impl OpCode for OpenFile {
52    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
53        Ok(Decision::Blocking)
54    }
55
56    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
57        Poll::Ready(Ok(syscall!(open(
58            self.path.as_ptr(),
59            self.flags | libc::O_CLOEXEC,
60            self.mode as libc::c_int
61        ))? as _))
62    }
63}
64
65impl OpCode for CloseFile {
66    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
67        Ok(Decision::Blocking)
68    }
69
70    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
71        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
72    }
73}
74
75/// Get metadata of an opened file.
76pub struct FileStat<S> {
77    pub(crate) fd: S,
78    pub(crate) stat: libc::stat,
79}
80
81impl<S> FileStat<S> {
82    /// Create [`FileStat`].
83    pub fn new(fd: S) -> Self {
84        Self {
85            fd,
86            stat: unsafe { std::mem::zeroed() },
87        }
88    }
89}
90
91impl<S: AsFd> OpCode for FileStat<S> {
92    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
93        Ok(Decision::Blocking)
94    }
95
96    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
97        let this = unsafe { self.get_unchecked_mut() };
98        #[cfg(gnulinux)]
99        {
100            let mut s: libc::statx = unsafe { std::mem::zeroed() };
101            static EMPTY_NAME: &[u8] = b"\0";
102            syscall!(libc::statx(
103                this.fd.as_fd().as_raw_fd(),
104                EMPTY_NAME.as_ptr().cast(),
105                libc::AT_EMPTY_PATH,
106                0,
107                &mut s
108            ))?;
109            this.stat = statx_to_stat(s);
110            Poll::Ready(Ok(0))
111        }
112        #[cfg(not(gnulinux))]
113        {
114            Poll::Ready(Ok(
115                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut this.stat))? as _,
116            ))
117        }
118    }
119}
120
121impl<S> IntoInner for FileStat<S> {
122    type Inner = libc::stat;
123
124    fn into_inner(self) -> Self::Inner {
125        self.stat
126    }
127}
128
129/// Get metadata from path.
130pub struct PathStat {
131    pub(crate) path: CString,
132    pub(crate) stat: libc::stat,
133    pub(crate) follow_symlink: bool,
134}
135
136impl PathStat {
137    /// Create [`PathStat`].
138    pub fn new(path: CString, follow_symlink: bool) -> Self {
139        Self {
140            path,
141            stat: unsafe { std::mem::zeroed() },
142            follow_symlink,
143        }
144    }
145}
146
147impl OpCode for PathStat {
148    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
149        Ok(Decision::Blocking)
150    }
151
152    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
153        #[cfg(gnulinux)]
154        {
155            let mut flags = libc::AT_EMPTY_PATH;
156            if !self.follow_symlink {
157                flags |= libc::AT_SYMLINK_NOFOLLOW;
158            }
159            let mut s: libc::statx = unsafe { std::mem::zeroed() };
160            syscall!(libc::statx(
161                libc::AT_FDCWD,
162                self.path.as_ptr(),
163                flags,
164                0,
165                &mut s
166            ))?;
167            self.stat = statx_to_stat(s);
168            Poll::Ready(Ok(0))
169        }
170        #[cfg(not(gnulinux))]
171        {
172            let f = if self.follow_symlink {
173                libc::stat
174            } else {
175                libc::lstat
176            };
177            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
178        }
179    }
180}
181
182impl IntoInner for PathStat {
183    type Inner = libc::stat;
184
185    fn into_inner(self) -> Self::Inner {
186        self.stat
187    }
188}
189
190impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
191    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
192        #[cfg(aio)]
193        {
194            let this = unsafe { self.get_unchecked_mut() };
195            let slice = this.buffer.as_mut_slice();
196
197            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
198            this.aiocb.aio_offset = this.offset as _;
199            this.aiocb.aio_buf = slice.as_mut_ptr().cast();
200            this.aiocb.aio_nbytes = slice.len();
201
202            Ok(Decision::aio(&mut this.aiocb, libc::aio_read))
203        }
204        #[cfg(not(aio))]
205        {
206            Ok(Decision::Blocking)
207        }
208    }
209
210    #[cfg(aio)]
211    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
212        Some(OpType::Aio(NonNull::from(
213            &mut unsafe { self.get_unchecked_mut() }.aiocb,
214        )))
215    }
216
217    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
218        let fd = self.fd.as_fd().as_raw_fd();
219        let offset = self.offset;
220        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
221        syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, offset as _,))
222    }
223}
224
225impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
226    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
227        #[cfg(freebsd)]
228        {
229            let this = unsafe { self.get_unchecked_mut() };
230            this.slices = unsafe { this.buffer.io_slices_mut() };
231
232            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
233            this.aiocb.aio_offset = this.offset as _;
234            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
235            this.aiocb.aio_nbytes = this.slices.len();
236
237            Ok(Decision::aio(&mut this.aiocb, libc::aio_readv))
238        }
239        #[cfg(not(freebsd))]
240        {
241            Ok(Decision::Blocking)
242        }
243    }
244
245    #[cfg(freebsd)]
246    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
247        Some(OpType::Aio(NonNull::from(
248            &mut unsafe { self.get_unchecked_mut() }.aiocb,
249        )))
250    }
251
252    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
253        let this = unsafe { self.get_unchecked_mut() };
254        this.slices = unsafe { this.buffer.io_slices_mut() };
255        syscall!(
256            break preadv(
257                this.fd.as_fd().as_raw_fd(),
258                this.slices.as_ptr() as _,
259                this.slices.len() as _,
260                this.offset as _,
261            )
262        )
263    }
264}
265
266impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
267    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
268        #[cfg(aio)]
269        {
270            let this = unsafe { self.get_unchecked_mut() };
271            let slice = this.buffer.as_slice();
272
273            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
274            this.aiocb.aio_offset = this.offset as _;
275            this.aiocb.aio_buf = slice.as_ptr().cast_mut().cast();
276            this.aiocb.aio_nbytes = slice.len();
277
278            Ok(Decision::aio(&mut this.aiocb, libc::aio_write))
279        }
280        #[cfg(not(aio))]
281        {
282            Ok(Decision::Blocking)
283        }
284    }
285
286    #[cfg(aio)]
287    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
288        Some(OpType::Aio(NonNull::from(
289            &mut unsafe { self.get_unchecked_mut() }.aiocb,
290        )))
291    }
292
293    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
294        let slice = self.buffer.as_slice();
295        syscall!(
296            break pwrite(
297                self.fd.as_fd().as_raw_fd(),
298                slice.as_ptr() as _,
299                slice.len() as _,
300                self.offset as _,
301            )
302        )
303    }
304}
305
306impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
307    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
308        #[cfg(freebsd)]
309        {
310            let this = unsafe { self.get_unchecked_mut() };
311            this.slices = unsafe { this.buffer.io_slices() };
312
313            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
314            this.aiocb.aio_offset = this.offset as _;
315            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
316            this.aiocb.aio_nbytes = this.slices.len();
317
318            Ok(Decision::aio(&mut this.aiocb, libc::aio_writev))
319        }
320        #[cfg(not(freebsd))]
321        {
322            Ok(Decision::Blocking)
323        }
324    }
325
326    #[cfg(freebsd)]
327    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
328        Some(OpType::Aio(NonNull::from(
329            &mut unsafe { self.get_unchecked_mut() }.aiocb,
330        )))
331    }
332
333    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
334        let this = unsafe { self.get_unchecked_mut() };
335        this.slices = unsafe { this.buffer.io_slices() };
336        syscall!(
337            break pwritev(
338                this.fd.as_fd().as_raw_fd(),
339                this.slices.as_ptr() as _,
340                this.slices.len() as _,
341                this.offset as _,
342            )
343        )
344    }
345}
346
347impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
348    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
349        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
350    }
351
352    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
353        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
354    }
355
356    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
357        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
358    }
359}
360
361impl<S: AsFd> OpCode for Sync<S> {
362    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
363        #[cfg(aio)]
364        {
365            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
366                libc::aio_fsync(libc::O_SYNC, aiocbp)
367            }
368            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
369                libc::aio_fsync(libc::O_DSYNC, aiocbp)
370            }
371
372            let this = unsafe { self.get_unchecked_mut() };
373            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
374
375            let f = if this.datasync {
376                aio_fdatasync
377            } else {
378                aio_fsync
379            };
380
381            Ok(Decision::aio(&mut this.aiocb, f))
382        }
383        #[cfg(not(aio))]
384        {
385            Ok(Decision::Blocking)
386        }
387    }
388
389    #[cfg(aio)]
390    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
391        Some(OpType::Aio(NonNull::from(
392            &mut unsafe { self.get_unchecked_mut() }.aiocb,
393        )))
394    }
395
396    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
397        #[cfg(datasync)]
398        {
399            Poll::Ready(Ok(syscall!(if self.datasync {
400                libc::fdatasync(self.fd.as_fd().as_raw_fd())
401            } else {
402                libc::fsync(self.fd.as_fd().as_raw_fd())
403            })? as _))
404        }
405        #[cfg(not(datasync))]
406        {
407            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
408        }
409    }
410}
411
412impl OpCode for Unlink {
413    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
414        Ok(Decision::Blocking)
415    }
416
417    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
418        if self.dir {
419            syscall!(libc::rmdir(self.path.as_ptr()))?;
420        } else {
421            syscall!(libc::unlink(self.path.as_ptr()))?;
422        }
423        Poll::Ready(Ok(0))
424    }
425}
426
427impl OpCode for CreateDir {
428    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
429        Ok(Decision::Blocking)
430    }
431
432    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
433        syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
434        Poll::Ready(Ok(0))
435    }
436}
437
438impl OpCode for Rename {
439    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
440        Ok(Decision::Blocking)
441    }
442
443    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
444        syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
445        Poll::Ready(Ok(0))
446    }
447}
448
449impl OpCode for Symlink {
450    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
451        Ok(Decision::Blocking)
452    }
453
454    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
455        syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
456        Poll::Ready(Ok(0))
457    }
458}
459
460impl OpCode for HardLink {
461    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
462        Ok(Decision::Blocking)
463    }
464
465    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
466        syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
467        Poll::Ready(Ok(0))
468    }
469}
470
471impl CreateSocket {
472    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
473        #[allow(unused_mut)]
474        let mut ty: i32 = self.socket_type;
475        #[cfg(any(
476            target_os = "android",
477            target_os = "dragonfly",
478            target_os = "freebsd",
479            target_os = "fuchsia",
480            target_os = "hurd",
481            target_os = "illumos",
482            target_os = "linux",
483            target_os = "netbsd",
484            target_os = "openbsd",
485            target_os = "cygwin",
486        ))]
487        {
488            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
489        }
490        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
491        let socket = Socket2::from_raw_fd(fd);
492        #[cfg(not(any(
493            target_os = "android",
494            target_os = "dragonfly",
495            target_os = "freebsd",
496            target_os = "fuchsia",
497            target_os = "hurd",
498            target_os = "illumos",
499            target_os = "linux",
500            target_os = "netbsd",
501            target_os = "openbsd",
502            target_os = "espidf",
503            target_os = "vita",
504            target_os = "cygwin",
505        )))]
506        socket.set_cloexec(true)?;
507        #[cfg(any(
508            target_os = "ios",
509            target_os = "macos",
510            target_os = "tvos",
511            target_os = "watchos",
512        ))]
513        socket.set_nosigpipe(true)?;
514        #[cfg(not(any(
515            target_os = "android",
516            target_os = "dragonfly",
517            target_os = "freebsd",
518            target_os = "fuchsia",
519            target_os = "hurd",
520            target_os = "illumos",
521            target_os = "linux",
522            target_os = "netbsd",
523            target_os = "openbsd",
524            target_os = "cygwin",
525        )))]
526        socket.set_nonblocking(true)?;
527        Ok(socket.into_raw_fd())
528    }
529}
530
531impl OpCode for CreateSocket {
532    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
533        Ok(Decision::Blocking)
534    }
535
536    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
537        Poll::Ready(Ok(unsafe { self.call()? } as _))
538    }
539}
540
541impl<S: AsFd> OpCode for ShutdownSocket<S> {
542    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
543        Ok(Decision::Blocking)
544    }
545
546    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
547        Poll::Ready(Ok(
548            syscall!(libc::shutdown(self.fd.as_fd().as_raw_fd(), self.how()))? as _,
549        ))
550    }
551}
552
553impl OpCode for CloseSocket {
554    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
555        Ok(Decision::Blocking)
556    }
557
558    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
559        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
560    }
561}
562
563impl<S: AsFd> Accept<S> {
564    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
565        let this = self.get_unchecked_mut();
566        #[cfg(any(
567            target_os = "android",
568            target_os = "dragonfly",
569            target_os = "freebsd",
570            target_os = "fuchsia",
571            target_os = "illumos",
572            target_os = "linux",
573            target_os = "netbsd",
574            target_os = "openbsd",
575            target_os = "cygwin",
576        ))]
577        {
578            libc::accept4(
579                this.fd.as_fd().as_raw_fd(),
580                &mut this.buffer as *mut _ as *mut _,
581                &mut this.addr_len,
582                libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
583            )
584        }
585        #[cfg(not(any(
586            target_os = "android",
587            target_os = "dragonfly",
588            target_os = "freebsd",
589            target_os = "fuchsia",
590            target_os = "illumos",
591            target_os = "linux",
592            target_os = "netbsd",
593            target_os = "openbsd",
594            target_os = "cygwin",
595        )))]
596        {
597            || -> io::Result<libc::c_int> {
598                let fd = syscall!(libc::accept(
599                    this.fd.as_fd().as_raw_fd(),
600                    &mut this.buffer as *mut _ as *mut _,
601                    &mut this.addr_len,
602                ))?;
603                let socket = Socket2::from_raw_fd(fd);
604                socket.set_cloexec(true)?;
605                socket.set_nonblocking(true)?;
606                Ok(socket.into_raw_fd())
607            }()
608            .unwrap_or(-1)
609        }
610    }
611}
612
613impl<S: AsFd> OpCode for Accept<S> {
614    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
615        let fd = self.fd.as_fd().as_raw_fd();
616        syscall!(self.as_mut().call(), wait_readable(fd))
617    }
618
619    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
620        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
621    }
622
623    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
624        let res = syscall!(break self.as_mut().call());
625        if let Poll::Ready(Ok(fd)) = res {
626            unsafe {
627                self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
628            }
629        }
630        res
631    }
632}
633
634impl<S: AsFd> OpCode for Connect<S> {
635    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
636        syscall!(
637            libc::connect(
638                self.fd.as_fd().as_raw_fd(),
639                self.addr.as_ptr().cast(),
640                self.addr.len()
641            ),
642            wait_writable(self.fd.as_fd().as_raw_fd())
643        )
644    }
645
646    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
647        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
648    }
649
650    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
651        let mut err: libc::c_int = 0;
652        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
653
654        syscall!(libc::getsockopt(
655            self.fd.as_fd().as_raw_fd(),
656            libc::SOL_SOCKET,
657            libc::SO_ERROR,
658            &mut err as *mut _ as *mut _,
659            &mut err_len
660        ))?;
661
662        let res = if err == 0 {
663            Ok(0)
664        } else {
665            Err(io::Error::from_raw_os_error(err))
666        };
667        Poll::Ready(res)
668    }
669}
670
671impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
672    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
673        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
674    }
675
676    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
677        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
678    }
679
680    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
681        let fd = self.fd.as_fd().as_raw_fd();
682        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
683        syscall!(break libc::read(fd, slice.as_mut_ptr() as _, slice.len()))
684    }
685}
686
687impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
688    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
689        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
690    }
691
692    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
693        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
694    }
695
696    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
697        let this = unsafe { self.get_unchecked_mut() };
698        this.slices = unsafe { this.buffer.io_slices_mut() };
699        syscall!(
700            break libc::readv(
701                this.fd.as_fd().as_raw_fd(),
702                this.slices.as_ptr() as _,
703                this.slices.len() as _
704            )
705        )
706    }
707}
708
709impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
710    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
711        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
712    }
713
714    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
715        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
716    }
717
718    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
719        let slice = self.buffer.as_slice();
720        syscall!(
721            break libc::write(
722                self.fd.as_fd().as_raw_fd(),
723                slice.as_ptr() as _,
724                slice.len()
725            )
726        )
727    }
728}
729
730impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
731    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
732        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
733    }
734
735    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
736        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
737    }
738
739    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
740        let this = unsafe { self.get_unchecked_mut() };
741        this.slices = unsafe { this.buffer.io_slices() };
742        syscall!(
743            break libc::writev(
744                this.fd.as_fd().as_raw_fd(),
745                this.slices.as_ptr() as _,
746                this.slices.len() as _
747            )
748        )
749    }
750}
751
752impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
753    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
754        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
755    }
756
757    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
758        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
759    }
760
761    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
762        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
763    }
764}
765
766/// Receive data and source address.
767pub struct RecvFrom<T: IoBufMut, S> {
768    pub(crate) fd: S,
769    pub(crate) buffer: T,
770    pub(crate) addr: SockAddrStorage,
771    pub(crate) addr_len: socklen_t,
772    _p: PhantomPinned,
773}
774
775impl<T: IoBufMut, S> RecvFrom<T, S> {
776    /// Create [`RecvFrom`].
777    pub fn new(fd: S, buffer: T) -> Self {
778        let addr = SockAddrStorage::zeroed();
779        let addr_len = addr.size_of();
780        Self {
781            fd,
782            buffer,
783            addr,
784            addr_len,
785            _p: PhantomPinned,
786        }
787    }
788}
789
790impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
791    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
792        let this = self.get_unchecked_mut();
793        let fd = this.fd.as_fd().as_raw_fd();
794        let slice = this.buffer.as_mut_slice();
795        libc::recvfrom(
796            fd,
797            slice.as_mut_ptr() as _,
798            slice.len(),
799            0,
800            &mut this.addr as *mut _ as _,
801            &mut this.addr_len,
802        )
803    }
804}
805
806impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
807    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
808        let fd = self.fd.as_fd().as_raw_fd();
809        syscall!(self.as_mut().call(), wait_readable(fd))
810    }
811
812    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
813        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
814    }
815
816    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
817        syscall!(break self.as_mut().call())
818    }
819}
820
821impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
822    type Inner = (T, SockAddrStorage, socklen_t);
823
824    fn into_inner(self) -> Self::Inner {
825        (self.buffer, self.addr, self.addr_len)
826    }
827}
828
829/// Receive data and source address into vectored buffer.
830pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
831    pub(crate) fd: S,
832    pub(crate) buffer: T,
833    pub(crate) slices: Vec<IoSliceMut>,
834    pub(crate) addr: SockAddrStorage,
835    pub(crate) msg: libc::msghdr,
836    _p: PhantomPinned,
837}
838
839impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
840    /// Create [`RecvFromVectored`].
841    pub fn new(fd: S, buffer: T) -> Self {
842        Self {
843            fd,
844            buffer,
845            slices: vec![],
846            addr: SockAddrStorage::zeroed(),
847            msg: unsafe { std::mem::zeroed() },
848            _p: PhantomPinned,
849        }
850    }
851}
852
853impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
854    fn set_msg(&mut self) {
855        self.slices = unsafe { self.buffer.io_slices_mut() };
856        self.msg.msg_name = &mut self.addr as *mut _ as _;
857        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
858        self.msg.msg_iov = self.slices.as_mut_ptr() as _;
859        self.msg.msg_iovlen = self.slices.len() as _;
860    }
861
862    unsafe fn call(&mut self) -> libc::ssize_t {
863        libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0)
864    }
865}
866
867impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
868    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
869        let this = unsafe { self.get_unchecked_mut() };
870        this.set_msg();
871        syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
872    }
873
874    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
875        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
876    }
877
878    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
879        let this = unsafe { self.get_unchecked_mut() };
880        syscall!(break this.call())
881    }
882}
883
884impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
885    type Inner = (T, SockAddrStorage, socklen_t);
886
887    fn into_inner(self) -> Self::Inner {
888        (self.buffer, self.addr, self.msg.msg_namelen)
889    }
890}
891
892/// Send data to specified address.
893pub struct SendTo<T: IoBuf, S> {
894    pub(crate) fd: S,
895    pub(crate) buffer: T,
896    pub(crate) addr: SockAddr,
897    _p: PhantomPinned,
898}
899
900impl<T: IoBuf, S> SendTo<T, S> {
901    /// Create [`SendTo`].
902    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
903        Self {
904            fd,
905            buffer,
906            addr,
907            _p: PhantomPinned,
908        }
909    }
910}
911
912impl<T: IoBuf, S: AsFd> SendTo<T, S> {
913    unsafe fn call(&self) -> libc::ssize_t {
914        let slice = self.buffer.as_slice();
915        libc::sendto(
916            self.fd.as_fd().as_raw_fd(),
917            slice.as_ptr() as _,
918            slice.len(),
919            0,
920            self.addr.as_ptr().cast(),
921            self.addr.len(),
922        )
923    }
924}
925
926impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
927    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
928        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
929    }
930
931    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
932        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
933    }
934
935    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
936        syscall!(break self.call())
937    }
938}
939
940impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
941    type Inner = T;
942
943    fn into_inner(self) -> Self::Inner {
944        self.buffer
945    }
946}
947
948/// Send data to specified address from vectored buffer.
949pub struct SendToVectored<T: IoVectoredBuf, S> {
950    pub(crate) fd: S,
951    pub(crate) buffer: T,
952    pub(crate) addr: SockAddr,
953    pub(crate) slices: Vec<IoSlice>,
954    pub(crate) msg: libc::msghdr,
955    _p: PhantomPinned,
956}
957
958impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
959    /// Create [`SendToVectored`].
960    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
961        Self {
962            fd,
963            buffer,
964            addr,
965            slices: vec![],
966            msg: unsafe { std::mem::zeroed() },
967            _p: PhantomPinned,
968        }
969    }
970}
971
972impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
973    fn set_msg(&mut self) {
974        self.slices = unsafe { self.buffer.io_slices() };
975        self.msg.msg_name = &mut self.addr as *mut _ as _;
976        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
977        self.msg.msg_iov = self.slices.as_mut_ptr() as _;
978        self.msg.msg_iovlen = self.slices.len() as _;
979    }
980
981    unsafe fn call(&self) -> libc::ssize_t {
982        libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0)
983    }
984}
985
986impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
987    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
988        let this = unsafe { self.get_unchecked_mut() };
989        this.set_msg();
990        syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
991    }
992
993    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
994        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
995    }
996
997    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
998        syscall!(break self.call())
999    }
1000}
1001
1002impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1003    type Inner = T;
1004
1005    fn into_inner(self) -> Self::Inner {
1006        self.buffer
1007    }
1008}
1009
1010impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1011    unsafe fn call(&mut self) -> libc::ssize_t {
1012        libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0)
1013    }
1014}
1015
1016impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1017    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1018        let this = unsafe { self.get_unchecked_mut() };
1019        unsafe { this.set_msg() };
1020        syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
1021    }
1022
1023    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1024        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1025    }
1026
1027    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1028        let this = unsafe { self.get_unchecked_mut() };
1029        syscall!(break this.call())
1030    }
1031}
1032
1033impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1034    unsafe fn call(&self) -> libc::ssize_t {
1035        libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0)
1036    }
1037}
1038
1039impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1040    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1041        let this = unsafe { self.get_unchecked_mut() };
1042        unsafe { this.set_msg() };
1043        syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1044    }
1045
1046    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1047        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1048    }
1049
1050    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1051        syscall!(break self.call())
1052    }
1053}
1054
1055impl<S: AsFd> OpCode for PollOnce<S> {
1056    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1057        Ok(Decision::wait_for(
1058            self.fd.as_fd().as_raw_fd(),
1059            self.interest,
1060        ))
1061    }
1062
1063    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1064        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1065    }
1066
1067    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1068        Poll::Ready(Ok(0))
1069    }
1070}