compio_driver/poll/
op.rs

1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4    ffi::CString,
5    io,
6    marker::PhantomPinned,
7    os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8    pin::Pin,
9    task::Poll,
10};
11
12use compio_buf::{
13    BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
14};
15#[cfg(not(gnulinux))]
16use libc::open;
17#[cfg(gnulinux)]
18use libc::open64 as open;
19#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
20use libc::{pread, preadv, pwrite, pwritev};
21#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
22use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
23use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
24
25use super::{AsFd, Decision, OpCode, OpType, syscall};
26use crate::op::*;
27pub use crate::unix::op::*;
28
29impl<
30    D: std::marker::Send + 'static,
31    F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
32> OpCode for Asyncify<F, D>
33{
34    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
35        Ok(Decision::Blocking)
36    }
37
38    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
39        // SAFETY: self won't be moved
40        let this = unsafe { self.get_unchecked_mut() };
41        let f = this
42            .f
43            .take()
44            .expect("the operate method could only be called once");
45        let BufResult(res, data) = f();
46        this.data = Some(data);
47        Poll::Ready(res)
48    }
49}
50
51impl<
52    S,
53    D: std::marker::Send + 'static,
54    F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
55> OpCode for AsyncifyFd<S, F, D>
56{
57    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
58        Ok(Decision::Blocking)
59    }
60
61    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
62        // SAFETY: self won't be moved
63        let this = unsafe { self.get_unchecked_mut() };
64        let f = this
65            .f
66            .take()
67            .expect("the operate method could only be called once");
68        let BufResult(res, data) = f(&this.fd);
69        this.data = Some(data);
70        Poll::Ready(res)
71    }
72}
73
74impl OpCode for OpenFile {
75    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
76        Ok(Decision::Blocking)
77    }
78
79    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
80        Poll::Ready(Ok(syscall!(open(
81            self.path.as_ptr(),
82            self.flags | libc::O_CLOEXEC,
83            self.mode as libc::c_int
84        ))? as _))
85    }
86}
87
88impl OpCode for CloseFile {
89    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
90        Ok(Decision::Blocking)
91    }
92
93    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
94        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
95    }
96}
97
98/// Get metadata of an opened file.
99pub struct FileStat<S> {
100    pub(crate) fd: S,
101    pub(crate) stat: libc::stat,
102}
103
104impl<S> FileStat<S> {
105    /// Create [`FileStat`].
106    pub fn new(fd: S) -> Self {
107        Self {
108            fd,
109            stat: unsafe { std::mem::zeroed() },
110        }
111    }
112}
113
114impl<S: AsFd> OpCode for FileStat<S> {
115    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
116        Ok(Decision::Blocking)
117    }
118
119    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
120        let this = unsafe { self.get_unchecked_mut() };
121        #[cfg(gnulinux)]
122        {
123            let mut s: libc::statx = unsafe { std::mem::zeroed() };
124            static EMPTY_NAME: &[u8] = b"\0";
125            syscall!(libc::statx(
126                this.fd.as_fd().as_raw_fd(),
127                EMPTY_NAME.as_ptr().cast(),
128                libc::AT_EMPTY_PATH,
129                0,
130                &mut s
131            ))?;
132            this.stat = statx_to_stat(s);
133            Poll::Ready(Ok(0))
134        }
135        #[cfg(not(gnulinux))]
136        {
137            Poll::Ready(Ok(
138                syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut this.stat))? as _,
139            ))
140        }
141    }
142}
143
144impl<S> IntoInner for FileStat<S> {
145    type Inner = libc::stat;
146
147    fn into_inner(self) -> Self::Inner {
148        self.stat
149    }
150}
151
152/// Get metadata from path.
153pub struct PathStat {
154    pub(crate) path: CString,
155    pub(crate) stat: libc::stat,
156    pub(crate) follow_symlink: bool,
157}
158
159impl PathStat {
160    /// Create [`PathStat`].
161    pub fn new(path: CString, follow_symlink: bool) -> Self {
162        Self {
163            path,
164            stat: unsafe { std::mem::zeroed() },
165            follow_symlink,
166        }
167    }
168}
169
170impl OpCode for PathStat {
171    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
172        Ok(Decision::Blocking)
173    }
174
175    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
176        #[cfg(gnulinux)]
177        {
178            let mut flags = libc::AT_EMPTY_PATH;
179            if !self.follow_symlink {
180                flags |= libc::AT_SYMLINK_NOFOLLOW;
181            }
182            let mut s: libc::statx = unsafe { std::mem::zeroed() };
183            syscall!(libc::statx(
184                libc::AT_FDCWD,
185                self.path.as_ptr(),
186                flags,
187                0,
188                &mut s
189            ))?;
190            self.stat = statx_to_stat(s);
191            Poll::Ready(Ok(0))
192        }
193        #[cfg(not(gnulinux))]
194        {
195            let f = if self.follow_symlink {
196                libc::stat
197            } else {
198                libc::lstat
199            };
200            Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
201        }
202    }
203}
204
205impl IntoInner for PathStat {
206    type Inner = libc::stat;
207
208    fn into_inner(self) -> Self::Inner {
209        self.stat
210    }
211}
212
213impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
214    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
215        #[cfg(aio)]
216        {
217            let this = unsafe { self.get_unchecked_mut() };
218            let slice = this.buffer.as_mut_slice();
219
220            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
221            this.aiocb.aio_offset = this.offset as _;
222            this.aiocb.aio_buf = slice.as_mut_ptr().cast();
223            this.aiocb.aio_nbytes = slice.len();
224
225            Ok(Decision::aio(&mut this.aiocb, libc::aio_read))
226        }
227        #[cfg(not(aio))]
228        {
229            Ok(Decision::Blocking)
230        }
231    }
232
233    #[cfg(aio)]
234    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
235        Some(OpType::Aio(NonNull::from(
236            &mut unsafe { self.get_unchecked_mut() }.aiocb,
237        )))
238    }
239
240    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
241        let fd = self.fd.as_fd().as_raw_fd();
242        let offset = self.offset;
243        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
244        syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, offset as _,))
245    }
246}
247
248impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
249    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
250        #[cfg(freebsd)]
251        {
252            let this = unsafe { self.get_unchecked_mut() };
253            this.slices = unsafe { this.buffer.io_slices_mut() };
254
255            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
256            this.aiocb.aio_offset = this.offset as _;
257            this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
258            this.aiocb.aio_nbytes = this.slices.len();
259
260            Ok(Decision::aio(&mut this.aiocb, libc::aio_readv))
261        }
262        #[cfg(not(freebsd))]
263        {
264            Ok(Decision::Blocking)
265        }
266    }
267
268    #[cfg(freebsd)]
269    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
270        Some(OpType::Aio(NonNull::from(
271            &mut unsafe { self.get_unchecked_mut() }.aiocb,
272        )))
273    }
274
275    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
276        let this = unsafe { self.get_unchecked_mut() };
277        this.slices = unsafe { this.buffer.io_slices_mut() };
278        syscall!(
279            break preadv(
280                this.fd.as_fd().as_raw_fd(),
281                this.slices.as_ptr() as _,
282                this.slices.len() as _,
283                this.offset as _,
284            )
285        )
286    }
287}
288
289impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
290    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
291        #[cfg(aio)]
292        {
293            let this = unsafe { self.get_unchecked_mut() };
294            let slice = this.buffer.as_slice();
295
296            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
297            this.aiocb.aio_offset = this.offset as _;
298            this.aiocb.aio_buf = slice.as_ptr().cast_mut().cast();
299            this.aiocb.aio_nbytes = slice.len();
300
301            Ok(Decision::aio(&mut this.aiocb, libc::aio_write))
302        }
303        #[cfg(not(aio))]
304        {
305            Ok(Decision::Blocking)
306        }
307    }
308
309    #[cfg(aio)]
310    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
311        Some(OpType::Aio(NonNull::from(
312            &mut unsafe { self.get_unchecked_mut() }.aiocb,
313        )))
314    }
315
316    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
317        let slice = self.buffer.as_slice();
318        syscall!(
319            break pwrite(
320                self.fd.as_fd().as_raw_fd(),
321                slice.as_ptr() as _,
322                slice.len() as _,
323                self.offset as _,
324            )
325        )
326    }
327}
328
329impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
330    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
331        #[cfg(freebsd)]
332        {
333            let this = unsafe { self.get_unchecked_mut() };
334            this.slices = unsafe { this.buffer.io_slices() };
335
336            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
337            this.aiocb.aio_offset = this.offset as _;
338            this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
339            this.aiocb.aio_nbytes = this.slices.len();
340
341            Ok(Decision::aio(&mut this.aiocb, libc::aio_writev))
342        }
343        #[cfg(not(freebsd))]
344        {
345            Ok(Decision::Blocking)
346        }
347    }
348
349    #[cfg(freebsd)]
350    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
351        Some(OpType::Aio(NonNull::from(
352            &mut unsafe { self.get_unchecked_mut() }.aiocb,
353        )))
354    }
355
356    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
357        let this = unsafe { self.get_unchecked_mut() };
358        this.slices = unsafe { this.buffer.io_slices() };
359        syscall!(
360            break pwritev(
361                this.fd.as_fd().as_raw_fd(),
362                this.slices.as_ptr() as _,
363                this.slices.len() as _,
364                this.offset as _,
365            )
366        )
367    }
368}
369
370impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
371    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
372        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
373    }
374
375    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
376        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
377    }
378
379    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
380        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
381    }
382}
383
384impl<S: AsFd> OpCode for Sync<S> {
385    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
386        #[cfg(aio)]
387        {
388            unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
389                libc::aio_fsync(libc::O_SYNC, aiocbp)
390            }
391            unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
392                libc::aio_fsync(libc::O_DSYNC, aiocbp)
393            }
394
395            let this = unsafe { self.get_unchecked_mut() };
396            this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
397
398            let f = if this.datasync {
399                aio_fdatasync
400            } else {
401                aio_fsync
402            };
403
404            Ok(Decision::aio(&mut this.aiocb, f))
405        }
406        #[cfg(not(aio))]
407        {
408            Ok(Decision::Blocking)
409        }
410    }
411
412    #[cfg(aio)]
413    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
414        Some(OpType::Aio(NonNull::from(
415            &mut unsafe { self.get_unchecked_mut() }.aiocb,
416        )))
417    }
418
419    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
420        #[cfg(datasync)]
421        {
422            Poll::Ready(Ok(syscall!(if self.datasync {
423                libc::fdatasync(self.fd.as_fd().as_raw_fd())
424            } else {
425                libc::fsync(self.fd.as_fd().as_raw_fd())
426            })? as _))
427        }
428        #[cfg(not(datasync))]
429        {
430            Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
431        }
432    }
433}
434
435impl OpCode for Unlink {
436    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
437        Ok(Decision::Blocking)
438    }
439
440    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
441        if self.dir {
442            syscall!(libc::rmdir(self.path.as_ptr()))?;
443        } else {
444            syscall!(libc::unlink(self.path.as_ptr()))?;
445        }
446        Poll::Ready(Ok(0))
447    }
448}
449
450impl OpCode for CreateDir {
451    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
452        Ok(Decision::Blocking)
453    }
454
455    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
456        syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
457        Poll::Ready(Ok(0))
458    }
459}
460
461impl OpCode for Rename {
462    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
463        Ok(Decision::Blocking)
464    }
465
466    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
467        syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
468        Poll::Ready(Ok(0))
469    }
470}
471
472impl OpCode for Symlink {
473    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
474        Ok(Decision::Blocking)
475    }
476
477    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
478        syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
479        Poll::Ready(Ok(0))
480    }
481}
482
483impl OpCode for HardLink {
484    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
485        Ok(Decision::Blocking)
486    }
487
488    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
489        syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
490        Poll::Ready(Ok(0))
491    }
492}
493
494impl CreateSocket {
495    unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
496        #[allow(unused_mut)]
497        let mut ty: i32 = self.socket_type;
498        #[cfg(any(
499            target_os = "android",
500            target_os = "dragonfly",
501            target_os = "freebsd",
502            target_os = "fuchsia",
503            target_os = "hurd",
504            target_os = "illumos",
505            target_os = "linux",
506            target_os = "netbsd",
507            target_os = "openbsd",
508            target_os = "cygwin",
509        ))]
510        {
511            ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
512        }
513        let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
514        let socket = unsafe { Socket2::from_raw_fd(fd) };
515        #[cfg(not(any(
516            target_os = "android",
517            target_os = "dragonfly",
518            target_os = "freebsd",
519            target_os = "fuchsia",
520            target_os = "hurd",
521            target_os = "illumos",
522            target_os = "linux",
523            target_os = "netbsd",
524            target_os = "openbsd",
525            target_os = "espidf",
526            target_os = "vita",
527            target_os = "cygwin",
528        )))]
529        socket.set_cloexec(true)?;
530        #[cfg(any(
531            target_os = "ios",
532            target_os = "macos",
533            target_os = "tvos",
534            target_os = "watchos",
535        ))]
536        socket.set_nosigpipe(true)?;
537        #[cfg(not(any(
538            target_os = "android",
539            target_os = "dragonfly",
540            target_os = "freebsd",
541            target_os = "fuchsia",
542            target_os = "hurd",
543            target_os = "illumos",
544            target_os = "linux",
545            target_os = "netbsd",
546            target_os = "openbsd",
547            target_os = "cygwin",
548        )))]
549        socket.set_nonblocking(true)?;
550        Ok(socket.into_raw_fd())
551    }
552}
553
554impl OpCode for CreateSocket {
555    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
556        Ok(Decision::Blocking)
557    }
558
559    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
560        Poll::Ready(Ok(unsafe { self.call()? } as _))
561    }
562}
563
564impl<S: AsFd> OpCode for ShutdownSocket<S> {
565    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
566        Ok(Decision::Blocking)
567    }
568
569    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
570        Poll::Ready(Ok(
571            syscall!(libc::shutdown(self.fd.as_fd().as_raw_fd(), self.how()))? as _,
572        ))
573    }
574}
575
576impl OpCode for CloseSocket {
577    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
578        Ok(Decision::Blocking)
579    }
580
581    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
582        Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
583    }
584}
585
586impl<S: AsFd> Accept<S> {
587    unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
588        let this = unsafe { self.get_unchecked_mut() };
589        #[cfg(any(
590            target_os = "android",
591            target_os = "dragonfly",
592            target_os = "freebsd",
593            target_os = "fuchsia",
594            target_os = "illumos",
595            target_os = "linux",
596            target_os = "netbsd",
597            target_os = "openbsd",
598            target_os = "cygwin",
599        ))]
600        unsafe {
601            libc::accept4(
602                this.fd.as_fd().as_raw_fd(),
603                &mut this.buffer as *mut _ as *mut _,
604                &mut this.addr_len,
605                libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
606            )
607        }
608        #[cfg(not(any(
609            target_os = "android",
610            target_os = "dragonfly",
611            target_os = "freebsd",
612            target_os = "fuchsia",
613            target_os = "illumos",
614            target_os = "linux",
615            target_os = "netbsd",
616            target_os = "openbsd",
617            target_os = "cygwin",
618        )))]
619        {
620            || -> io::Result<libc::c_int> {
621                let fd = syscall!(libc::accept(
622                    this.fd.as_fd().as_raw_fd(),
623                    &mut this.buffer as *mut _ as *mut _,
624                    &mut this.addr_len,
625                ))?;
626                let socket = unsafe { Socket2::from_raw_fd(fd) };
627                socket.set_cloexec(true)?;
628                socket.set_nonblocking(true)?;
629                Ok(socket.into_raw_fd())
630            }()
631            .unwrap_or(-1)
632        }
633    }
634}
635
636impl<S: AsFd> OpCode for Accept<S> {
637    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
638        let fd = self.fd.as_fd().as_raw_fd();
639        syscall!(self.as_mut().call(), wait_readable(fd))
640    }
641
642    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
643        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
644    }
645
646    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
647        let res = syscall!(break self.as_mut().call());
648        if let Poll::Ready(Ok(fd)) = res {
649            unsafe {
650                self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
651            }
652        }
653        res
654    }
655}
656
657impl<S: AsFd> OpCode for Connect<S> {
658    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
659        syscall!(
660            libc::connect(
661                self.fd.as_fd().as_raw_fd(),
662                self.addr.as_ptr().cast(),
663                self.addr.len()
664            ),
665            wait_writable(self.fd.as_fd().as_raw_fd())
666        )
667    }
668
669    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
670        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
671    }
672
673    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
674        let mut err: libc::c_int = 0;
675        let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
676
677        syscall!(libc::getsockopt(
678            self.fd.as_fd().as_raw_fd(),
679            libc::SOL_SOCKET,
680            libc::SO_ERROR,
681            &mut err as *mut _ as *mut _,
682            &mut err_len
683        ))?;
684
685        let res = if err == 0 {
686            Ok(0)
687        } else {
688            Err(io::Error::from_raw_os_error(err))
689        };
690        Poll::Ready(res)
691    }
692}
693
694impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
695    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
696        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
697    }
698
699    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
700        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
701    }
702
703    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
704        let fd = self.fd.as_fd().as_raw_fd();
705        let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
706        syscall!(break libc::read(fd, slice.as_mut_ptr() as _, slice.len()))
707    }
708}
709
710impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
711    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
712        Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
713    }
714
715    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
716        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
717    }
718
719    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
720        let this = unsafe { self.get_unchecked_mut() };
721        this.slices = unsafe { this.buffer.io_slices_mut() };
722        syscall!(
723            break libc::readv(
724                this.fd.as_fd().as_raw_fd(),
725                this.slices.as_ptr() as _,
726                this.slices.len() as _
727            )
728        )
729    }
730}
731
732impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
733    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
734        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
735    }
736
737    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
738        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
739    }
740
741    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
742        let slice = self.buffer.as_slice();
743        syscall!(
744            break libc::write(
745                self.fd.as_fd().as_raw_fd(),
746                slice.as_ptr() as _,
747                slice.len()
748            )
749        )
750    }
751}
752
753impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
754    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
755        Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
756    }
757
758    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
759        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
760    }
761
762    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
763        let this = unsafe { self.get_unchecked_mut() };
764        this.slices = unsafe { this.buffer.io_slices() };
765        syscall!(
766            break libc::writev(
767                this.fd.as_fd().as_raw_fd(),
768                this.slices.as_ptr() as _,
769                this.slices.len() as _
770            )
771        )
772    }
773}
774
775impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
776    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
777        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
778    }
779
780    fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
781        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
782    }
783
784    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
785        unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
786    }
787}
788
789/// Receive data and source address.
790pub struct RecvFrom<T: IoBufMut, S> {
791    pub(crate) fd: S,
792    pub(crate) buffer: T,
793    pub(crate) addr: SockAddrStorage,
794    pub(crate) addr_len: socklen_t,
795    _p: PhantomPinned,
796}
797
798impl<T: IoBufMut, S> RecvFrom<T, S> {
799    /// Create [`RecvFrom`].
800    pub fn new(fd: S, buffer: T) -> Self {
801        let addr = SockAddrStorage::zeroed();
802        let addr_len = addr.size_of();
803        Self {
804            fd,
805            buffer,
806            addr,
807            addr_len,
808            _p: PhantomPinned,
809        }
810    }
811}
812
813impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
814    unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
815        let this = unsafe { self.get_unchecked_mut() };
816        let fd = this.fd.as_fd().as_raw_fd();
817        let slice = this.buffer.as_mut_slice();
818        unsafe {
819            libc::recvfrom(
820                fd,
821                slice.as_mut_ptr() as _,
822                slice.len(),
823                0,
824                &mut this.addr as *mut _ as _,
825                &mut this.addr_len,
826            )
827        }
828    }
829}
830
831impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
832    fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
833        let fd = self.fd.as_fd().as_raw_fd();
834        syscall!(self.as_mut().call(), wait_readable(fd))
835    }
836
837    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
838        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
839    }
840
841    fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
842        syscall!(break self.as_mut().call())
843    }
844}
845
846impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
847    type Inner = (T, SockAddrStorage, socklen_t);
848
849    fn into_inner(self) -> Self::Inner {
850        (self.buffer, self.addr, self.addr_len)
851    }
852}
853
854/// Receive data and source address into vectored buffer.
855pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
856    pub(crate) fd: S,
857    pub(crate) buffer: T,
858    pub(crate) slices: Vec<IoSliceMut>,
859    pub(crate) addr: SockAddrStorage,
860    pub(crate) msg: libc::msghdr,
861    _p: PhantomPinned,
862}
863
864impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
865    /// Create [`RecvFromVectored`].
866    pub fn new(fd: S, buffer: T) -> Self {
867        Self {
868            fd,
869            buffer,
870            slices: vec![],
871            addr: SockAddrStorage::zeroed(),
872            msg: unsafe { std::mem::zeroed() },
873            _p: PhantomPinned,
874        }
875    }
876}
877
878impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
879    fn set_msg(&mut self) {
880        self.slices = unsafe { self.buffer.io_slices_mut() };
881        self.msg.msg_name = &mut self.addr as *mut _ as _;
882        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
883        self.msg.msg_iov = self.slices.as_mut_ptr() as _;
884        self.msg.msg_iovlen = self.slices.len() as _;
885    }
886
887    unsafe fn call(&mut self) -> libc::ssize_t {
888        unsafe { libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0) }
889    }
890}
891
892impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
893    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
894        let this = unsafe { self.get_unchecked_mut() };
895        this.set_msg();
896        syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
897    }
898
899    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
900        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
901    }
902
903    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
904        let this = unsafe { self.get_unchecked_mut() };
905        syscall!(break this.call())
906    }
907}
908
909impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
910    type Inner = (T, SockAddrStorage, socklen_t);
911
912    fn into_inner(self) -> Self::Inner {
913        (self.buffer, self.addr, self.msg.msg_namelen)
914    }
915}
916
917/// Send data to specified address.
918pub struct SendTo<T: IoBuf, S> {
919    pub(crate) fd: S,
920    pub(crate) buffer: T,
921    pub(crate) addr: SockAddr,
922    _p: PhantomPinned,
923}
924
925impl<T: IoBuf, S> SendTo<T, S> {
926    /// Create [`SendTo`].
927    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
928        Self {
929            fd,
930            buffer,
931            addr,
932            _p: PhantomPinned,
933        }
934    }
935}
936
937impl<T: IoBuf, S: AsFd> SendTo<T, S> {
938    unsafe fn call(&self) -> libc::ssize_t {
939        let slice = self.buffer.as_slice();
940        unsafe {
941            libc::sendto(
942                self.fd.as_fd().as_raw_fd(),
943                slice.as_ptr() as _,
944                slice.len(),
945                0,
946                self.addr.as_ptr().cast(),
947                self.addr.len(),
948            )
949        }
950    }
951}
952
953impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
954    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
955        syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
956    }
957
958    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
959        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
960    }
961
962    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
963        syscall!(break self.call())
964    }
965}
966
967impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
968    type Inner = T;
969
970    fn into_inner(self) -> Self::Inner {
971        self.buffer
972    }
973}
974
975/// Send data to specified address from vectored buffer.
976pub struct SendToVectored<T: IoVectoredBuf, S> {
977    pub(crate) fd: S,
978    pub(crate) buffer: T,
979    pub(crate) addr: SockAddr,
980    pub(crate) slices: Vec<IoSlice>,
981    pub(crate) msg: libc::msghdr,
982    _p: PhantomPinned,
983}
984
985impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
986    /// Create [`SendToVectored`].
987    pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
988        Self {
989            fd,
990            buffer,
991            addr,
992            slices: vec![],
993            msg: unsafe { std::mem::zeroed() },
994            _p: PhantomPinned,
995        }
996    }
997}
998
999impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1000    fn set_msg(&mut self) {
1001        self.slices = unsafe { self.buffer.io_slices() };
1002        self.msg.msg_name = &mut self.addr as *mut _ as _;
1003        self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
1004        self.msg.msg_iov = self.slices.as_mut_ptr() as _;
1005        self.msg.msg_iovlen = self.slices.len() as _;
1006    }
1007
1008    unsafe fn call(&self) -> libc::ssize_t {
1009        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0) }
1010    }
1011}
1012
1013impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1014    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1015        let this = unsafe { self.get_unchecked_mut() };
1016        this.set_msg();
1017        syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1018    }
1019
1020    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1021        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1022    }
1023
1024    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1025        syscall!(break self.call())
1026    }
1027}
1028
1029impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1030    type Inner = T;
1031
1032    fn into_inner(self) -> Self::Inner {
1033        self.buffer
1034    }
1035}
1036
1037impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1038    unsafe fn call(&mut self) -> libc::ssize_t {
1039        unsafe { libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0) }
1040    }
1041}
1042
1043impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1044    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1045        let this = unsafe { self.get_unchecked_mut() };
1046        unsafe { this.set_msg() };
1047        syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
1048    }
1049
1050    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1051        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1052    }
1053
1054    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1055        let this = unsafe { self.get_unchecked_mut() };
1056        syscall!(break this.call())
1057    }
1058}
1059
1060impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1061    unsafe fn call(&self) -> libc::ssize_t {
1062        unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0) }
1063    }
1064}
1065
1066impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1067    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1068        let this = unsafe { self.get_unchecked_mut() };
1069        unsafe { this.set_msg() };
1070        syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1071    }
1072
1073    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1074        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1075    }
1076
1077    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1078        syscall!(break self.call())
1079    }
1080}
1081
1082impl<S: AsFd> OpCode for PollOnce<S> {
1083    fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1084        Ok(Decision::wait_for(
1085            self.fd.as_fd().as_raw_fd(),
1086            self.interest,
1087        ))
1088    }
1089
1090    fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1091        Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1092    }
1093
1094    fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1095        Poll::Ready(Ok(0))
1096    }
1097}