1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4 ffi::CString,
5 io,
6 marker::PhantomPinned,
7 os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8 pin::Pin,
9 task::Poll,
10};
11
12use compio_buf::{
13 BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
14};
15#[cfg(not(gnulinux))]
16use libc::open;
17#[cfg(gnulinux)]
18use libc::open64 as open;
19#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
20use libc::{pread, preadv, pwrite, pwritev};
21#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
22use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
23use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
24
25use super::{AsFd, Decision, OpCode, OpType, syscall};
26use crate::op::*;
27pub use crate::unix::op::*;
28
29impl<
30 D: std::marker::Send + 'static,
31 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
32> OpCode for Asyncify<F, D>
33{
34 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
35 Ok(Decision::Blocking)
36 }
37
38 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
39 let this = unsafe { self.get_unchecked_mut() };
41 let f = this
42 .f
43 .take()
44 .expect("the operate method could only be called once");
45 let BufResult(res, data) = f();
46 this.data = Some(data);
47 Poll::Ready(res)
48 }
49}
50
51impl OpCode for OpenFile {
52 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
53 Ok(Decision::Blocking)
54 }
55
56 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
57 Poll::Ready(Ok(syscall!(open(
58 self.path.as_ptr(),
59 self.flags | libc::O_CLOEXEC,
60 self.mode as libc::c_int
61 ))? as _))
62 }
63}
64
65impl OpCode for CloseFile {
66 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
67 Ok(Decision::Blocking)
68 }
69
70 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
71 Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
72 }
73}
74
75pub struct FileStat<S> {
77 pub(crate) fd: S,
78 pub(crate) stat: libc::stat,
79}
80
81impl<S> FileStat<S> {
82 pub fn new(fd: S) -> Self {
84 Self {
85 fd,
86 stat: unsafe { std::mem::zeroed() },
87 }
88 }
89}
90
91impl<S: AsFd> OpCode for FileStat<S> {
92 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
93 Ok(Decision::Blocking)
94 }
95
96 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
97 let this = unsafe { self.get_unchecked_mut() };
98 #[cfg(gnulinux)]
99 {
100 let mut s: libc::statx = unsafe { std::mem::zeroed() };
101 static EMPTY_NAME: &[u8] = b"\0";
102 syscall!(libc::statx(
103 this.fd.as_fd().as_raw_fd(),
104 EMPTY_NAME.as_ptr().cast(),
105 libc::AT_EMPTY_PATH,
106 0,
107 &mut s
108 ))?;
109 this.stat = statx_to_stat(s);
110 Poll::Ready(Ok(0))
111 }
112 #[cfg(not(gnulinux))]
113 {
114 Poll::Ready(Ok(
115 syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut this.stat))? as _,
116 ))
117 }
118 }
119}
120
121impl<S> IntoInner for FileStat<S> {
122 type Inner = libc::stat;
123
124 fn into_inner(self) -> Self::Inner {
125 self.stat
126 }
127}
128
129pub struct PathStat {
131 pub(crate) path: CString,
132 pub(crate) stat: libc::stat,
133 pub(crate) follow_symlink: bool,
134}
135
136impl PathStat {
137 pub fn new(path: CString, follow_symlink: bool) -> Self {
139 Self {
140 path,
141 stat: unsafe { std::mem::zeroed() },
142 follow_symlink,
143 }
144 }
145}
146
147impl OpCode for PathStat {
148 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
149 Ok(Decision::Blocking)
150 }
151
152 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
153 #[cfg(gnulinux)]
154 {
155 let mut flags = libc::AT_EMPTY_PATH;
156 if !self.follow_symlink {
157 flags |= libc::AT_SYMLINK_NOFOLLOW;
158 }
159 let mut s: libc::statx = unsafe { std::mem::zeroed() };
160 syscall!(libc::statx(
161 libc::AT_FDCWD,
162 self.path.as_ptr(),
163 flags,
164 0,
165 &mut s
166 ))?;
167 self.stat = statx_to_stat(s);
168 Poll::Ready(Ok(0))
169 }
170 #[cfg(not(gnulinux))]
171 {
172 let f = if self.follow_symlink {
173 libc::stat
174 } else {
175 libc::lstat
176 };
177 Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
178 }
179 }
180}
181
182impl IntoInner for PathStat {
183 type Inner = libc::stat;
184
185 fn into_inner(self) -> Self::Inner {
186 self.stat
187 }
188}
189
190impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
191 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
192 #[cfg(aio)]
193 {
194 let this = unsafe { self.get_unchecked_mut() };
195 let slice = this.buffer.as_mut_slice();
196
197 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
198 this.aiocb.aio_offset = this.offset as _;
199 this.aiocb.aio_buf = slice.as_mut_ptr().cast();
200 this.aiocb.aio_nbytes = slice.len();
201
202 Ok(Decision::aio(&mut this.aiocb, libc::aio_read))
203 }
204 #[cfg(not(aio))]
205 {
206 Ok(Decision::Blocking)
207 }
208 }
209
210 #[cfg(aio)]
211 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
212 Some(OpType::Aio(NonNull::from(
213 &mut unsafe { self.get_unchecked_mut() }.aiocb,
214 )))
215 }
216
217 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
218 let fd = self.fd.as_fd().as_raw_fd();
219 let offset = self.offset;
220 let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
221 syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, offset as _,))
222 }
223}
224
225impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
226 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
227 #[cfg(freebsd)]
228 {
229 let this = unsafe { self.get_unchecked_mut() };
230 this.slices = unsafe { this.buffer.io_slices_mut() };
231
232 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
233 this.aiocb.aio_offset = this.offset as _;
234 this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
235 this.aiocb.aio_nbytes = this.slices.len();
236
237 Ok(Decision::aio(&mut this.aiocb, libc::aio_readv))
238 }
239 #[cfg(not(freebsd))]
240 {
241 Ok(Decision::Blocking)
242 }
243 }
244
245 #[cfg(freebsd)]
246 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
247 Some(OpType::Aio(NonNull::from(
248 &mut unsafe { self.get_unchecked_mut() }.aiocb,
249 )))
250 }
251
252 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
253 let this = unsafe { self.get_unchecked_mut() };
254 this.slices = unsafe { this.buffer.io_slices_mut() };
255 syscall!(
256 break preadv(
257 this.fd.as_fd().as_raw_fd(),
258 this.slices.as_ptr() as _,
259 this.slices.len() as _,
260 this.offset as _,
261 )
262 )
263 }
264}
265
266impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
267 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
268 #[cfg(aio)]
269 {
270 let this = unsafe { self.get_unchecked_mut() };
271 let slice = this.buffer.as_slice();
272
273 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
274 this.aiocb.aio_offset = this.offset as _;
275 this.aiocb.aio_buf = slice.as_ptr().cast_mut().cast();
276 this.aiocb.aio_nbytes = slice.len();
277
278 Ok(Decision::aio(&mut this.aiocb, libc::aio_write))
279 }
280 #[cfg(not(aio))]
281 {
282 Ok(Decision::Blocking)
283 }
284 }
285
286 #[cfg(aio)]
287 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
288 Some(OpType::Aio(NonNull::from(
289 &mut unsafe { self.get_unchecked_mut() }.aiocb,
290 )))
291 }
292
293 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
294 let slice = self.buffer.as_slice();
295 syscall!(
296 break pwrite(
297 self.fd.as_fd().as_raw_fd(),
298 slice.as_ptr() as _,
299 slice.len() as _,
300 self.offset as _,
301 )
302 )
303 }
304}
305
306impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
307 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
308 #[cfg(freebsd)]
309 {
310 let this = unsafe { self.get_unchecked_mut() };
311 this.slices = unsafe { this.buffer.io_slices() };
312
313 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
314 this.aiocb.aio_offset = this.offset as _;
315 this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
316 this.aiocb.aio_nbytes = this.slices.len();
317
318 Ok(Decision::aio(&mut this.aiocb, libc::aio_writev))
319 }
320 #[cfg(not(freebsd))]
321 {
322 Ok(Decision::Blocking)
323 }
324 }
325
326 #[cfg(freebsd)]
327 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
328 Some(OpType::Aio(NonNull::from(
329 &mut unsafe { self.get_unchecked_mut() }.aiocb,
330 )))
331 }
332
333 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
334 let this = unsafe { self.get_unchecked_mut() };
335 this.slices = unsafe { this.buffer.io_slices() };
336 syscall!(
337 break pwritev(
338 this.fd.as_fd().as_raw_fd(),
339 this.slices.as_ptr() as _,
340 this.slices.len() as _,
341 this.offset as _,
342 )
343 )
344 }
345}
346
347impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
348 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
349 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
350 }
351
352 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
353 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
354 }
355
356 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
357 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
358 }
359}
360
361impl<S: AsFd> OpCode for Sync<S> {
362 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
363 #[cfg(aio)]
364 {
365 unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
366 libc::aio_fsync(libc::O_SYNC, aiocbp)
367 }
368 unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
369 libc::aio_fsync(libc::O_DSYNC, aiocbp)
370 }
371
372 let this = unsafe { self.get_unchecked_mut() };
373 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
374
375 let f = if this.datasync {
376 aio_fdatasync
377 } else {
378 aio_fsync
379 };
380
381 Ok(Decision::aio(&mut this.aiocb, f))
382 }
383 #[cfg(not(aio))]
384 {
385 Ok(Decision::Blocking)
386 }
387 }
388
389 #[cfg(aio)]
390 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
391 Some(OpType::Aio(NonNull::from(
392 &mut unsafe { self.get_unchecked_mut() }.aiocb,
393 )))
394 }
395
396 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
397 #[cfg(datasync)]
398 {
399 Poll::Ready(Ok(syscall!(if self.datasync {
400 libc::fdatasync(self.fd.as_fd().as_raw_fd())
401 } else {
402 libc::fsync(self.fd.as_fd().as_raw_fd())
403 })? as _))
404 }
405 #[cfg(not(datasync))]
406 {
407 Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
408 }
409 }
410}
411
412impl OpCode for Unlink {
413 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
414 Ok(Decision::Blocking)
415 }
416
417 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
418 if self.dir {
419 syscall!(libc::rmdir(self.path.as_ptr()))?;
420 } else {
421 syscall!(libc::unlink(self.path.as_ptr()))?;
422 }
423 Poll::Ready(Ok(0))
424 }
425}
426
427impl OpCode for CreateDir {
428 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
429 Ok(Decision::Blocking)
430 }
431
432 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
433 syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
434 Poll::Ready(Ok(0))
435 }
436}
437
438impl OpCode for Rename {
439 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
440 Ok(Decision::Blocking)
441 }
442
443 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
444 syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
445 Poll::Ready(Ok(0))
446 }
447}
448
449impl OpCode for Symlink {
450 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
451 Ok(Decision::Blocking)
452 }
453
454 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
455 syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
456 Poll::Ready(Ok(0))
457 }
458}
459
460impl OpCode for HardLink {
461 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
462 Ok(Decision::Blocking)
463 }
464
465 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
466 syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
467 Poll::Ready(Ok(0))
468 }
469}
470
471impl CreateSocket {
472 unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
473 #[allow(unused_mut)]
474 let mut ty: i32 = self.socket_type;
475 #[cfg(any(
476 target_os = "android",
477 target_os = "dragonfly",
478 target_os = "freebsd",
479 target_os = "fuchsia",
480 target_os = "hurd",
481 target_os = "illumos",
482 target_os = "linux",
483 target_os = "netbsd",
484 target_os = "openbsd",
485 target_os = "cygwin",
486 ))]
487 {
488 ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
489 }
490 let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
491 let socket = Socket2::from_raw_fd(fd);
492 #[cfg(not(any(
493 target_os = "android",
494 target_os = "dragonfly",
495 target_os = "freebsd",
496 target_os = "fuchsia",
497 target_os = "hurd",
498 target_os = "illumos",
499 target_os = "linux",
500 target_os = "netbsd",
501 target_os = "openbsd",
502 target_os = "espidf",
503 target_os = "vita",
504 target_os = "cygwin",
505 )))]
506 socket.set_cloexec(true)?;
507 #[cfg(any(
508 target_os = "ios",
509 target_os = "macos",
510 target_os = "tvos",
511 target_os = "watchos",
512 ))]
513 socket.set_nosigpipe(true)?;
514 #[cfg(not(any(
515 target_os = "android",
516 target_os = "dragonfly",
517 target_os = "freebsd",
518 target_os = "fuchsia",
519 target_os = "hurd",
520 target_os = "illumos",
521 target_os = "linux",
522 target_os = "netbsd",
523 target_os = "openbsd",
524 target_os = "cygwin",
525 )))]
526 socket.set_nonblocking(true)?;
527 Ok(socket.into_raw_fd())
528 }
529}
530
531impl OpCode for CreateSocket {
532 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
533 Ok(Decision::Blocking)
534 }
535
536 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
537 Poll::Ready(Ok(unsafe { self.call()? } as _))
538 }
539}
540
541impl<S: AsFd> OpCode for ShutdownSocket<S> {
542 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
543 Ok(Decision::Blocking)
544 }
545
546 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
547 Poll::Ready(Ok(
548 syscall!(libc::shutdown(self.fd.as_fd().as_raw_fd(), self.how()))? as _,
549 ))
550 }
551}
552
553impl OpCode for CloseSocket {
554 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
555 Ok(Decision::Blocking)
556 }
557
558 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
559 Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
560 }
561}
562
563impl<S: AsFd> Accept<S> {
564 unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
565 let this = self.get_unchecked_mut();
566 #[cfg(any(
567 target_os = "android",
568 target_os = "dragonfly",
569 target_os = "freebsd",
570 target_os = "fuchsia",
571 target_os = "illumos",
572 target_os = "linux",
573 target_os = "netbsd",
574 target_os = "openbsd",
575 target_os = "cygwin",
576 ))]
577 {
578 libc::accept4(
579 this.fd.as_fd().as_raw_fd(),
580 &mut this.buffer as *mut _ as *mut _,
581 &mut this.addr_len,
582 libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
583 )
584 }
585 #[cfg(not(any(
586 target_os = "android",
587 target_os = "dragonfly",
588 target_os = "freebsd",
589 target_os = "fuchsia",
590 target_os = "illumos",
591 target_os = "linux",
592 target_os = "netbsd",
593 target_os = "openbsd",
594 target_os = "cygwin",
595 )))]
596 {
597 || -> io::Result<libc::c_int> {
598 let fd = syscall!(libc::accept(
599 this.fd.as_fd().as_raw_fd(),
600 &mut this.buffer as *mut _ as *mut _,
601 &mut this.addr_len,
602 ))?;
603 let socket = Socket2::from_raw_fd(fd);
604 socket.set_cloexec(true)?;
605 socket.set_nonblocking(true)?;
606 Ok(socket.into_raw_fd())
607 }()
608 .unwrap_or(-1)
609 }
610 }
611}
612
613impl<S: AsFd> OpCode for Accept<S> {
614 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
615 let fd = self.fd.as_fd().as_raw_fd();
616 syscall!(self.as_mut().call(), wait_readable(fd))
617 }
618
619 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
620 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
621 }
622
623 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
624 let res = syscall!(break self.as_mut().call());
625 if let Poll::Ready(Ok(fd)) = res {
626 unsafe {
627 self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
628 }
629 }
630 res
631 }
632}
633
634impl<S: AsFd> OpCode for Connect<S> {
635 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
636 syscall!(
637 libc::connect(
638 self.fd.as_fd().as_raw_fd(),
639 self.addr.as_ptr().cast(),
640 self.addr.len()
641 ),
642 wait_writable(self.fd.as_fd().as_raw_fd())
643 )
644 }
645
646 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
647 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
648 }
649
650 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
651 let mut err: libc::c_int = 0;
652 let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
653
654 syscall!(libc::getsockopt(
655 self.fd.as_fd().as_raw_fd(),
656 libc::SOL_SOCKET,
657 libc::SO_ERROR,
658 &mut err as *mut _ as *mut _,
659 &mut err_len
660 ))?;
661
662 let res = if err == 0 {
663 Ok(0)
664 } else {
665 Err(io::Error::from_raw_os_error(err))
666 };
667 Poll::Ready(res)
668 }
669}
670
671impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
672 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
673 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
674 }
675
676 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
677 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
678 }
679
680 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
681 let fd = self.fd.as_fd().as_raw_fd();
682 let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
683 syscall!(break libc::read(fd, slice.as_mut_ptr() as _, slice.len()))
684 }
685}
686
687impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
688 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
689 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
690 }
691
692 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
693 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
694 }
695
696 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
697 let this = unsafe { self.get_unchecked_mut() };
698 this.slices = unsafe { this.buffer.io_slices_mut() };
699 syscall!(
700 break libc::readv(
701 this.fd.as_fd().as_raw_fd(),
702 this.slices.as_ptr() as _,
703 this.slices.len() as _
704 )
705 )
706 }
707}
708
709impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
710 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
711 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
712 }
713
714 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
715 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
716 }
717
718 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
719 let slice = self.buffer.as_slice();
720 syscall!(
721 break libc::write(
722 self.fd.as_fd().as_raw_fd(),
723 slice.as_ptr() as _,
724 slice.len()
725 )
726 )
727 }
728}
729
730impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
731 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
732 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
733 }
734
735 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
736 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
737 }
738
739 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
740 let this = unsafe { self.get_unchecked_mut() };
741 this.slices = unsafe { this.buffer.io_slices() };
742 syscall!(
743 break libc::writev(
744 this.fd.as_fd().as_raw_fd(),
745 this.slices.as_ptr() as _,
746 this.slices.len() as _
747 )
748 )
749 }
750}
751
752impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
753 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
754 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
755 }
756
757 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
758 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
759 }
760
761 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
762 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
763 }
764}
765
766pub struct RecvFrom<T: IoBufMut, S> {
768 pub(crate) fd: S,
769 pub(crate) buffer: T,
770 pub(crate) addr: SockAddrStorage,
771 pub(crate) addr_len: socklen_t,
772 _p: PhantomPinned,
773}
774
775impl<T: IoBufMut, S> RecvFrom<T, S> {
776 pub fn new(fd: S, buffer: T) -> Self {
778 let addr = SockAddrStorage::zeroed();
779 let addr_len = addr.size_of();
780 Self {
781 fd,
782 buffer,
783 addr,
784 addr_len,
785 _p: PhantomPinned,
786 }
787 }
788}
789
790impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
791 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
792 let this = self.get_unchecked_mut();
793 let fd = this.fd.as_fd().as_raw_fd();
794 let slice = this.buffer.as_mut_slice();
795 libc::recvfrom(
796 fd,
797 slice.as_mut_ptr() as _,
798 slice.len(),
799 0,
800 &mut this.addr as *mut _ as _,
801 &mut this.addr_len,
802 )
803 }
804}
805
806impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
807 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
808 let fd = self.fd.as_fd().as_raw_fd();
809 syscall!(self.as_mut().call(), wait_readable(fd))
810 }
811
812 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
813 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
814 }
815
816 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
817 syscall!(break self.as_mut().call())
818 }
819}
820
821impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
822 type Inner = (T, SockAddrStorage, socklen_t);
823
824 fn into_inner(self) -> Self::Inner {
825 (self.buffer, self.addr, self.addr_len)
826 }
827}
828
829pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
831 pub(crate) fd: S,
832 pub(crate) buffer: T,
833 pub(crate) slices: Vec<IoSliceMut>,
834 pub(crate) addr: SockAddrStorage,
835 pub(crate) msg: libc::msghdr,
836 _p: PhantomPinned,
837}
838
839impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
840 pub fn new(fd: S, buffer: T) -> Self {
842 Self {
843 fd,
844 buffer,
845 slices: vec![],
846 addr: SockAddrStorage::zeroed(),
847 msg: unsafe { std::mem::zeroed() },
848 _p: PhantomPinned,
849 }
850 }
851}
852
853impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
854 fn set_msg(&mut self) {
855 self.slices = unsafe { self.buffer.io_slices_mut() };
856 self.msg.msg_name = &mut self.addr as *mut _ as _;
857 self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
858 self.msg.msg_iov = self.slices.as_mut_ptr() as _;
859 self.msg.msg_iovlen = self.slices.len() as _;
860 }
861
862 unsafe fn call(&mut self) -> libc::ssize_t {
863 libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0)
864 }
865}
866
867impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
868 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
869 let this = unsafe { self.get_unchecked_mut() };
870 this.set_msg();
871 syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
872 }
873
874 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
875 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
876 }
877
878 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
879 let this = unsafe { self.get_unchecked_mut() };
880 syscall!(break this.call())
881 }
882}
883
884impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
885 type Inner = (T, SockAddrStorage, socklen_t);
886
887 fn into_inner(self) -> Self::Inner {
888 (self.buffer, self.addr, self.msg.msg_namelen)
889 }
890}
891
892pub struct SendTo<T: IoBuf, S> {
894 pub(crate) fd: S,
895 pub(crate) buffer: T,
896 pub(crate) addr: SockAddr,
897 _p: PhantomPinned,
898}
899
900impl<T: IoBuf, S> SendTo<T, S> {
901 pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
903 Self {
904 fd,
905 buffer,
906 addr,
907 _p: PhantomPinned,
908 }
909 }
910}
911
912impl<T: IoBuf, S: AsFd> SendTo<T, S> {
913 unsafe fn call(&self) -> libc::ssize_t {
914 let slice = self.buffer.as_slice();
915 libc::sendto(
916 self.fd.as_fd().as_raw_fd(),
917 slice.as_ptr() as _,
918 slice.len(),
919 0,
920 self.addr.as_ptr().cast(),
921 self.addr.len(),
922 )
923 }
924}
925
926impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
927 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
928 syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
929 }
930
931 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
932 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
933 }
934
935 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
936 syscall!(break self.call())
937 }
938}
939
940impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
941 type Inner = T;
942
943 fn into_inner(self) -> Self::Inner {
944 self.buffer
945 }
946}
947
948pub struct SendToVectored<T: IoVectoredBuf, S> {
950 pub(crate) fd: S,
951 pub(crate) buffer: T,
952 pub(crate) addr: SockAddr,
953 pub(crate) slices: Vec<IoSlice>,
954 pub(crate) msg: libc::msghdr,
955 _p: PhantomPinned,
956}
957
958impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
959 pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
961 Self {
962 fd,
963 buffer,
964 addr,
965 slices: vec![],
966 msg: unsafe { std::mem::zeroed() },
967 _p: PhantomPinned,
968 }
969 }
970}
971
972impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
973 fn set_msg(&mut self) {
974 self.slices = unsafe { self.buffer.io_slices() };
975 self.msg.msg_name = &mut self.addr as *mut _ as _;
976 self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
977 self.msg.msg_iov = self.slices.as_mut_ptr() as _;
978 self.msg.msg_iovlen = self.slices.len() as _;
979 }
980
981 unsafe fn call(&self) -> libc::ssize_t {
982 libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0)
983 }
984}
985
986impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
987 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
988 let this = unsafe { self.get_unchecked_mut() };
989 this.set_msg();
990 syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
991 }
992
993 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
994 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
995 }
996
997 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
998 syscall!(break self.call())
999 }
1000}
1001
1002impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1003 type Inner = T;
1004
1005 fn into_inner(self) -> Self::Inner {
1006 self.buffer
1007 }
1008}
1009
1010impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1011 unsafe fn call(&mut self) -> libc::ssize_t {
1012 libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0)
1013 }
1014}
1015
1016impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1017 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1018 let this = unsafe { self.get_unchecked_mut() };
1019 unsafe { this.set_msg() };
1020 syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
1021 }
1022
1023 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1024 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1025 }
1026
1027 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1028 let this = unsafe { self.get_unchecked_mut() };
1029 syscall!(break this.call())
1030 }
1031}
1032
1033impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1034 unsafe fn call(&self) -> libc::ssize_t {
1035 libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0)
1036 }
1037}
1038
1039impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1040 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1041 let this = unsafe { self.get_unchecked_mut() };
1042 unsafe { this.set_msg() };
1043 syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1044 }
1045
1046 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1047 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1048 }
1049
1050 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1051 syscall!(break self.call())
1052 }
1053}
1054
1055impl<S: AsFd> OpCode for PollOnce<S> {
1056 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1057 Ok(Decision::wait_for(
1058 self.fd.as_fd().as_raw_fd(),
1059 self.interest,
1060 ))
1061 }
1062
1063 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1064 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1065 }
1066
1067 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1068 Poll::Ready(Ok(0))
1069 }
1070}