1#[cfg(aio)]
2use std::ptr::NonNull;
3use std::{
4 ffi::CString,
5 io,
6 marker::PhantomPinned,
7 os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd},
8 pin::Pin,
9 task::Poll,
10};
11
12use compio_buf::{
13 BufResult, IntoInner, IoBuf, IoBufMut, IoSlice, IoSliceMut, IoVectoredBuf, IoVectoredBufMut,
14};
15#[cfg(not(gnulinux))]
16use libc::open;
17#[cfg(gnulinux)]
18use libc::open64 as open;
19#[cfg(not(any(target_os = "linux", target_os = "android", target_os = "hurd")))]
20use libc::{pread, preadv, pwrite, pwritev};
21#[cfg(any(target_os = "linux", target_os = "android", target_os = "hurd"))]
22use libc::{pread64 as pread, preadv64 as preadv, pwrite64 as pwrite, pwritev64 as pwritev};
23use socket2::{SockAddr, SockAddrStorage, Socket as Socket2, socklen_t};
24
25use super::{AsFd, Decision, OpCode, OpType, syscall};
26use crate::op::*;
27pub use crate::unix::op::*;
28
29impl<
30 D: std::marker::Send + 'static,
31 F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + 'static,
32> OpCode for Asyncify<F, D>
33{
34 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
35 Ok(Decision::Blocking)
36 }
37
38 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
39 let this = unsafe { self.get_unchecked_mut() };
41 let f = this
42 .f
43 .take()
44 .expect("the operate method could only be called once");
45 let BufResult(res, data) = f();
46 this.data = Some(data);
47 Poll::Ready(res)
48 }
49}
50
51impl<
52 S,
53 D: std::marker::Send + 'static,
54 F: (FnOnce(&S) -> BufResult<usize, D>) + std::marker::Send + 'static,
55> OpCode for AsyncifyFd<S, F, D>
56{
57 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
58 Ok(Decision::Blocking)
59 }
60
61 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
62 let this = unsafe { self.get_unchecked_mut() };
64 let f = this
65 .f
66 .take()
67 .expect("the operate method could only be called once");
68 let BufResult(res, data) = f(&this.fd);
69 this.data = Some(data);
70 Poll::Ready(res)
71 }
72}
73
74impl OpCode for OpenFile {
75 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
76 Ok(Decision::Blocking)
77 }
78
79 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
80 Poll::Ready(Ok(syscall!(open(
81 self.path.as_ptr(),
82 self.flags | libc::O_CLOEXEC,
83 self.mode as libc::c_int
84 ))? as _))
85 }
86}
87
88impl OpCode for CloseFile {
89 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
90 Ok(Decision::Blocking)
91 }
92
93 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
94 Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
95 }
96}
97
98pub struct FileStat<S> {
100 pub(crate) fd: S,
101 pub(crate) stat: libc::stat,
102}
103
104impl<S> FileStat<S> {
105 pub fn new(fd: S) -> Self {
107 Self {
108 fd,
109 stat: unsafe { std::mem::zeroed() },
110 }
111 }
112}
113
114impl<S: AsFd> OpCode for FileStat<S> {
115 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
116 Ok(Decision::Blocking)
117 }
118
119 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
120 let this = unsafe { self.get_unchecked_mut() };
121 #[cfg(gnulinux)]
122 {
123 let mut s: libc::statx = unsafe { std::mem::zeroed() };
124 static EMPTY_NAME: &[u8] = b"\0";
125 syscall!(libc::statx(
126 this.fd.as_fd().as_raw_fd(),
127 EMPTY_NAME.as_ptr().cast(),
128 libc::AT_EMPTY_PATH,
129 0,
130 &mut s
131 ))?;
132 this.stat = statx_to_stat(s);
133 Poll::Ready(Ok(0))
134 }
135 #[cfg(not(gnulinux))]
136 {
137 Poll::Ready(Ok(
138 syscall!(libc::fstat(this.fd.as_fd().as_raw_fd(), &mut this.stat))? as _,
139 ))
140 }
141 }
142}
143
144impl<S> IntoInner for FileStat<S> {
145 type Inner = libc::stat;
146
147 fn into_inner(self) -> Self::Inner {
148 self.stat
149 }
150}
151
152pub struct PathStat {
154 pub(crate) path: CString,
155 pub(crate) stat: libc::stat,
156 pub(crate) follow_symlink: bool,
157}
158
159impl PathStat {
160 pub fn new(path: CString, follow_symlink: bool) -> Self {
162 Self {
163 path,
164 stat: unsafe { std::mem::zeroed() },
165 follow_symlink,
166 }
167 }
168}
169
170impl OpCode for PathStat {
171 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
172 Ok(Decision::Blocking)
173 }
174
175 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
176 #[cfg(gnulinux)]
177 {
178 let mut flags = libc::AT_EMPTY_PATH;
179 if !self.follow_symlink {
180 flags |= libc::AT_SYMLINK_NOFOLLOW;
181 }
182 let mut s: libc::statx = unsafe { std::mem::zeroed() };
183 syscall!(libc::statx(
184 libc::AT_FDCWD,
185 self.path.as_ptr(),
186 flags,
187 0,
188 &mut s
189 ))?;
190 self.stat = statx_to_stat(s);
191 Poll::Ready(Ok(0))
192 }
193 #[cfg(not(gnulinux))]
194 {
195 let f = if self.follow_symlink {
196 libc::stat
197 } else {
198 libc::lstat
199 };
200 Poll::Ready(Ok(syscall!(f(self.path.as_ptr(), &mut self.stat))? as _))
201 }
202 }
203}
204
205impl IntoInner for PathStat {
206 type Inner = libc::stat;
207
208 fn into_inner(self) -> Self::Inner {
209 self.stat
210 }
211}
212
213impl<T: IoBufMut, S: AsFd> OpCode for ReadAt<T, S> {
214 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
215 #[cfg(aio)]
216 {
217 let this = unsafe { self.get_unchecked_mut() };
218 let slice = this.buffer.as_mut_slice();
219
220 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
221 this.aiocb.aio_offset = this.offset as _;
222 this.aiocb.aio_buf = slice.as_mut_ptr().cast();
223 this.aiocb.aio_nbytes = slice.len();
224
225 Ok(Decision::aio(&mut this.aiocb, libc::aio_read))
226 }
227 #[cfg(not(aio))]
228 {
229 Ok(Decision::Blocking)
230 }
231 }
232
233 #[cfg(aio)]
234 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
235 Some(OpType::Aio(NonNull::from(
236 &mut unsafe { self.get_unchecked_mut() }.aiocb,
237 )))
238 }
239
240 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
241 let fd = self.fd.as_fd().as_raw_fd();
242 let offset = self.offset;
243 let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
244 syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, offset as _,))
245 }
246}
247
248impl<T: IoVectoredBufMut, S: AsFd> OpCode for ReadVectoredAt<T, S> {
249 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
250 #[cfg(freebsd)]
251 {
252 let this = unsafe { self.get_unchecked_mut() };
253 this.slices = unsafe { this.buffer.io_slices_mut() };
254
255 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
256 this.aiocb.aio_offset = this.offset as _;
257 this.aiocb.aio_buf = this.slices.as_mut_ptr().cast();
258 this.aiocb.aio_nbytes = this.slices.len();
259
260 Ok(Decision::aio(&mut this.aiocb, libc::aio_readv))
261 }
262 #[cfg(not(freebsd))]
263 {
264 Ok(Decision::Blocking)
265 }
266 }
267
268 #[cfg(freebsd)]
269 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
270 Some(OpType::Aio(NonNull::from(
271 &mut unsafe { self.get_unchecked_mut() }.aiocb,
272 )))
273 }
274
275 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
276 let this = unsafe { self.get_unchecked_mut() };
277 this.slices = unsafe { this.buffer.io_slices_mut() };
278 syscall!(
279 break preadv(
280 this.fd.as_fd().as_raw_fd(),
281 this.slices.as_ptr() as _,
282 this.slices.len() as _,
283 this.offset as _,
284 )
285 )
286 }
287}
288
289impl<T: IoBuf, S: AsFd> OpCode for WriteAt<T, S> {
290 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
291 #[cfg(aio)]
292 {
293 let this = unsafe { self.get_unchecked_mut() };
294 let slice = this.buffer.as_slice();
295
296 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
297 this.aiocb.aio_offset = this.offset as _;
298 this.aiocb.aio_buf = slice.as_ptr().cast_mut().cast();
299 this.aiocb.aio_nbytes = slice.len();
300
301 Ok(Decision::aio(&mut this.aiocb, libc::aio_write))
302 }
303 #[cfg(not(aio))]
304 {
305 Ok(Decision::Blocking)
306 }
307 }
308
309 #[cfg(aio)]
310 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
311 Some(OpType::Aio(NonNull::from(
312 &mut unsafe { self.get_unchecked_mut() }.aiocb,
313 )))
314 }
315
316 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
317 let slice = self.buffer.as_slice();
318 syscall!(
319 break pwrite(
320 self.fd.as_fd().as_raw_fd(),
321 slice.as_ptr() as _,
322 slice.len() as _,
323 self.offset as _,
324 )
325 )
326 }
327}
328
329impl<T: IoVectoredBuf, S: AsFd> OpCode for WriteVectoredAt<T, S> {
330 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
331 #[cfg(freebsd)]
332 {
333 let this = unsafe { self.get_unchecked_mut() };
334 this.slices = unsafe { this.buffer.io_slices() };
335
336 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
337 this.aiocb.aio_offset = this.offset as _;
338 this.aiocb.aio_buf = this.slices.as_ptr().cast_mut().cast();
339 this.aiocb.aio_nbytes = this.slices.len();
340
341 Ok(Decision::aio(&mut this.aiocb, libc::aio_writev))
342 }
343 #[cfg(not(freebsd))]
344 {
345 Ok(Decision::Blocking)
346 }
347 }
348
349 #[cfg(freebsd)]
350 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
351 Some(OpType::Aio(NonNull::from(
352 &mut unsafe { self.get_unchecked_mut() }.aiocb,
353 )))
354 }
355
356 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
357 let this = unsafe { self.get_unchecked_mut() };
358 this.slices = unsafe { this.buffer.io_slices() };
359 syscall!(
360 break pwritev(
361 this.fd.as_fd().as_raw_fd(),
362 this.slices.as_ptr() as _,
363 this.slices.len() as _,
364 this.offset as _,
365 )
366 )
367 }
368}
369
370impl<S: AsFd> OpCode for crate::op::managed::ReadManagedAt<S> {
371 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
372 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
373 }
374
375 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
376 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
377 }
378
379 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
380 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
381 }
382}
383
384impl<S: AsFd> OpCode for Sync<S> {
385 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
386 #[cfg(aio)]
387 {
388 unsafe extern "C" fn aio_fsync(aiocbp: *mut libc::aiocb) -> i32 {
389 libc::aio_fsync(libc::O_SYNC, aiocbp)
390 }
391 unsafe extern "C" fn aio_fdatasync(aiocbp: *mut libc::aiocb) -> i32 {
392 libc::aio_fsync(libc::O_DSYNC, aiocbp)
393 }
394
395 let this = unsafe { self.get_unchecked_mut() };
396 this.aiocb.aio_fildes = this.fd.as_fd().as_raw_fd();
397
398 let f = if this.datasync {
399 aio_fdatasync
400 } else {
401 aio_fsync
402 };
403
404 Ok(Decision::aio(&mut this.aiocb, f))
405 }
406 #[cfg(not(aio))]
407 {
408 Ok(Decision::Blocking)
409 }
410 }
411
412 #[cfg(aio)]
413 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
414 Some(OpType::Aio(NonNull::from(
415 &mut unsafe { self.get_unchecked_mut() }.aiocb,
416 )))
417 }
418
419 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
420 #[cfg(datasync)]
421 {
422 Poll::Ready(Ok(syscall!(if self.datasync {
423 libc::fdatasync(self.fd.as_fd().as_raw_fd())
424 } else {
425 libc::fsync(self.fd.as_fd().as_raw_fd())
426 })? as _))
427 }
428 #[cfg(not(datasync))]
429 {
430 Poll::Ready(Ok(syscall!(libc::fsync(self.fd.as_fd().as_raw_fd()))? as _))
431 }
432 }
433}
434
435impl OpCode for Unlink {
436 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
437 Ok(Decision::Blocking)
438 }
439
440 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
441 if self.dir {
442 syscall!(libc::rmdir(self.path.as_ptr()))?;
443 } else {
444 syscall!(libc::unlink(self.path.as_ptr()))?;
445 }
446 Poll::Ready(Ok(0))
447 }
448}
449
450impl OpCode for CreateDir {
451 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
452 Ok(Decision::Blocking)
453 }
454
455 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
456 syscall!(libc::mkdir(self.path.as_ptr(), self.mode))?;
457 Poll::Ready(Ok(0))
458 }
459}
460
461impl OpCode for Rename {
462 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
463 Ok(Decision::Blocking)
464 }
465
466 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
467 syscall!(libc::rename(self.old_path.as_ptr(), self.new_path.as_ptr()))?;
468 Poll::Ready(Ok(0))
469 }
470}
471
472impl OpCode for Symlink {
473 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
474 Ok(Decision::Blocking)
475 }
476
477 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
478 syscall!(libc::symlink(self.source.as_ptr(), self.target.as_ptr()))?;
479 Poll::Ready(Ok(0))
480 }
481}
482
483impl OpCode for HardLink {
484 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
485 Ok(Decision::Blocking)
486 }
487
488 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
489 syscall!(libc::link(self.source.as_ptr(), self.target.as_ptr()))?;
490 Poll::Ready(Ok(0))
491 }
492}
493
494impl CreateSocket {
495 unsafe fn call(self: Pin<&mut Self>) -> io::Result<libc::c_int> {
496 #[allow(unused_mut)]
497 let mut ty: i32 = self.socket_type;
498 #[cfg(any(
499 target_os = "android",
500 target_os = "dragonfly",
501 target_os = "freebsd",
502 target_os = "fuchsia",
503 target_os = "hurd",
504 target_os = "illumos",
505 target_os = "linux",
506 target_os = "netbsd",
507 target_os = "openbsd",
508 target_os = "cygwin",
509 ))]
510 {
511 ty |= libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
512 }
513 let fd = syscall!(libc::socket(self.domain, ty, self.protocol))?;
514 let socket = unsafe { Socket2::from_raw_fd(fd) };
515 #[cfg(not(any(
516 target_os = "android",
517 target_os = "dragonfly",
518 target_os = "freebsd",
519 target_os = "fuchsia",
520 target_os = "hurd",
521 target_os = "illumos",
522 target_os = "linux",
523 target_os = "netbsd",
524 target_os = "openbsd",
525 target_os = "espidf",
526 target_os = "vita",
527 target_os = "cygwin",
528 )))]
529 socket.set_cloexec(true)?;
530 #[cfg(any(
531 target_os = "ios",
532 target_os = "macos",
533 target_os = "tvos",
534 target_os = "watchos",
535 ))]
536 socket.set_nosigpipe(true)?;
537 #[cfg(not(any(
538 target_os = "android",
539 target_os = "dragonfly",
540 target_os = "freebsd",
541 target_os = "fuchsia",
542 target_os = "hurd",
543 target_os = "illumos",
544 target_os = "linux",
545 target_os = "netbsd",
546 target_os = "openbsd",
547 target_os = "cygwin",
548 )))]
549 socket.set_nonblocking(true)?;
550 Ok(socket.into_raw_fd())
551 }
552}
553
554impl OpCode for CreateSocket {
555 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
556 Ok(Decision::Blocking)
557 }
558
559 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
560 Poll::Ready(Ok(unsafe { self.call()? } as _))
561 }
562}
563
564impl<S: AsFd> OpCode for ShutdownSocket<S> {
565 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
566 Ok(Decision::Blocking)
567 }
568
569 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
570 Poll::Ready(Ok(
571 syscall!(libc::shutdown(self.fd.as_fd().as_raw_fd(), self.how()))? as _,
572 ))
573 }
574}
575
576impl OpCode for CloseSocket {
577 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
578 Ok(Decision::Blocking)
579 }
580
581 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
582 Poll::Ready(Ok(syscall!(libc::close(self.fd.as_fd().as_raw_fd()))? as _))
583 }
584}
585
586impl<S: AsFd> Accept<S> {
587 unsafe fn call(self: Pin<&mut Self>) -> libc::c_int {
588 let this = unsafe { self.get_unchecked_mut() };
589 #[cfg(any(
590 target_os = "android",
591 target_os = "dragonfly",
592 target_os = "freebsd",
593 target_os = "fuchsia",
594 target_os = "illumos",
595 target_os = "linux",
596 target_os = "netbsd",
597 target_os = "openbsd",
598 target_os = "cygwin",
599 ))]
600 unsafe {
601 libc::accept4(
602 this.fd.as_fd().as_raw_fd(),
603 &mut this.buffer as *mut _ as *mut _,
604 &mut this.addr_len,
605 libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC,
606 )
607 }
608 #[cfg(not(any(
609 target_os = "android",
610 target_os = "dragonfly",
611 target_os = "freebsd",
612 target_os = "fuchsia",
613 target_os = "illumos",
614 target_os = "linux",
615 target_os = "netbsd",
616 target_os = "openbsd",
617 target_os = "cygwin",
618 )))]
619 {
620 || -> io::Result<libc::c_int> {
621 let fd = syscall!(libc::accept(
622 this.fd.as_fd().as_raw_fd(),
623 &mut this.buffer as *mut _ as *mut _,
624 &mut this.addr_len,
625 ))?;
626 let socket = unsafe { Socket2::from_raw_fd(fd) };
627 socket.set_cloexec(true)?;
628 socket.set_nonblocking(true)?;
629 Ok(socket.into_raw_fd())
630 }()
631 .unwrap_or(-1)
632 }
633 }
634}
635
636impl<S: AsFd> OpCode for Accept<S> {
637 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
638 let fd = self.fd.as_fd().as_raw_fd();
639 syscall!(self.as_mut().call(), wait_readable(fd))
640 }
641
642 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
643 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
644 }
645
646 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
647 let res = syscall!(break self.as_mut().call());
648 if let Poll::Ready(Ok(fd)) = res {
649 unsafe {
650 self.get_unchecked_mut().accepted_fd = Some(OwnedFd::from_raw_fd(fd as _));
651 }
652 }
653 res
654 }
655}
656
657impl<S: AsFd> OpCode for Connect<S> {
658 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
659 syscall!(
660 libc::connect(
661 self.fd.as_fd().as_raw_fd(),
662 self.addr.as_ptr().cast(),
663 self.addr.len()
664 ),
665 wait_writable(self.fd.as_fd().as_raw_fd())
666 )
667 }
668
669 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
670 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
671 }
672
673 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
674 let mut err: libc::c_int = 0;
675 let mut err_len = std::mem::size_of::<libc::c_int>() as libc::socklen_t;
676
677 syscall!(libc::getsockopt(
678 self.fd.as_fd().as_raw_fd(),
679 libc::SOL_SOCKET,
680 libc::SO_ERROR,
681 &mut err as *mut _ as *mut _,
682 &mut err_len
683 ))?;
684
685 let res = if err == 0 {
686 Ok(0)
687 } else {
688 Err(io::Error::from_raw_os_error(err))
689 };
690 Poll::Ready(res)
691 }
692}
693
694impl<T: IoBufMut, S: AsFd> OpCode for Recv<T, S> {
695 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
696 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
697 }
698
699 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
700 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
701 }
702
703 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
704 let fd = self.fd.as_fd().as_raw_fd();
705 let slice = unsafe { self.get_unchecked_mut() }.buffer.as_mut_slice();
706 syscall!(break libc::read(fd, slice.as_mut_ptr() as _, slice.len()))
707 }
708}
709
710impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvVectored<T, S> {
711 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
712 Ok(Decision::wait_readable(self.fd.as_fd().as_raw_fd()))
713 }
714
715 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
716 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
717 }
718
719 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
720 let this = unsafe { self.get_unchecked_mut() };
721 this.slices = unsafe { this.buffer.io_slices_mut() };
722 syscall!(
723 break libc::readv(
724 this.fd.as_fd().as_raw_fd(),
725 this.slices.as_ptr() as _,
726 this.slices.len() as _
727 )
728 )
729 }
730}
731
732impl<T: IoBuf, S: AsFd> OpCode for Send<T, S> {
733 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
734 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
735 }
736
737 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
738 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
739 }
740
741 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
742 let slice = self.buffer.as_slice();
743 syscall!(
744 break libc::write(
745 self.fd.as_fd().as_raw_fd(),
746 slice.as_ptr() as _,
747 slice.len()
748 )
749 )
750 }
751}
752
753impl<T: IoVectoredBuf, S: AsFd> OpCode for SendVectored<T, S> {
754 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
755 Ok(Decision::wait_writable(self.fd.as_fd().as_raw_fd()))
756 }
757
758 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
759 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
760 }
761
762 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
763 let this = unsafe { self.get_unchecked_mut() };
764 this.slices = unsafe { this.buffer.io_slices() };
765 syscall!(
766 break libc::writev(
767 this.fd.as_fd().as_raw_fd(),
768 this.slices.as_ptr() as _,
769 this.slices.len() as _
770 )
771 )
772 }
773}
774
775impl<S: AsFd> OpCode for crate::op::managed::RecvManaged<S> {
776 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
777 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.pre_submit()
778 }
779
780 fn op_type(self: Pin<&mut Self>) -> Option<crate::OpType> {
781 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.op_type()
782 }
783
784 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
785 unsafe { self.map_unchecked_mut(|this| &mut this.op) }.operate()
786 }
787}
788
789pub struct RecvFrom<T: IoBufMut, S> {
791 pub(crate) fd: S,
792 pub(crate) buffer: T,
793 pub(crate) addr: SockAddrStorage,
794 pub(crate) addr_len: socklen_t,
795 _p: PhantomPinned,
796}
797
798impl<T: IoBufMut, S> RecvFrom<T, S> {
799 pub fn new(fd: S, buffer: T) -> Self {
801 let addr = SockAddrStorage::zeroed();
802 let addr_len = addr.size_of();
803 Self {
804 fd,
805 buffer,
806 addr,
807 addr_len,
808 _p: PhantomPinned,
809 }
810 }
811}
812
813impl<T: IoBufMut, S: AsFd> RecvFrom<T, S> {
814 unsafe fn call(self: Pin<&mut Self>) -> libc::ssize_t {
815 let this = unsafe { self.get_unchecked_mut() };
816 let fd = this.fd.as_fd().as_raw_fd();
817 let slice = this.buffer.as_mut_slice();
818 unsafe {
819 libc::recvfrom(
820 fd,
821 slice.as_mut_ptr() as _,
822 slice.len(),
823 0,
824 &mut this.addr as *mut _ as _,
825 &mut this.addr_len,
826 )
827 }
828 }
829}
830
831impl<T: IoBufMut, S: AsFd> OpCode for RecvFrom<T, S> {
832 fn pre_submit(mut self: Pin<&mut Self>) -> io::Result<Decision> {
833 let fd = self.fd.as_fd().as_raw_fd();
834 syscall!(self.as_mut().call(), wait_readable(fd))
835 }
836
837 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
838 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
839 }
840
841 fn operate(mut self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
842 syscall!(break self.as_mut().call())
843 }
844}
845
846impl<T: IoBufMut, S> IntoInner for RecvFrom<T, S> {
847 type Inner = (T, SockAddrStorage, socklen_t);
848
849 fn into_inner(self) -> Self::Inner {
850 (self.buffer, self.addr, self.addr_len)
851 }
852}
853
854pub struct RecvFromVectored<T: IoVectoredBufMut, S> {
856 pub(crate) fd: S,
857 pub(crate) buffer: T,
858 pub(crate) slices: Vec<IoSliceMut>,
859 pub(crate) addr: SockAddrStorage,
860 pub(crate) msg: libc::msghdr,
861 _p: PhantomPinned,
862}
863
864impl<T: IoVectoredBufMut, S> RecvFromVectored<T, S> {
865 pub fn new(fd: S, buffer: T) -> Self {
867 Self {
868 fd,
869 buffer,
870 slices: vec![],
871 addr: SockAddrStorage::zeroed(),
872 msg: unsafe { std::mem::zeroed() },
873 _p: PhantomPinned,
874 }
875 }
876}
877
878impl<T: IoVectoredBufMut, S: AsFd> RecvFromVectored<T, S> {
879 fn set_msg(&mut self) {
880 self.slices = unsafe { self.buffer.io_slices_mut() };
881 self.msg.msg_name = &mut self.addr as *mut _ as _;
882 self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
883 self.msg.msg_iov = self.slices.as_mut_ptr() as _;
884 self.msg.msg_iovlen = self.slices.len() as _;
885 }
886
887 unsafe fn call(&mut self) -> libc::ssize_t {
888 unsafe { libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0) }
889 }
890}
891
892impl<T: IoVectoredBufMut, S: AsFd> OpCode for RecvFromVectored<T, S> {
893 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
894 let this = unsafe { self.get_unchecked_mut() };
895 this.set_msg();
896 syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
897 }
898
899 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
900 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
901 }
902
903 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
904 let this = unsafe { self.get_unchecked_mut() };
905 syscall!(break this.call())
906 }
907}
908
909impl<T: IoVectoredBufMut, S> IntoInner for RecvFromVectored<T, S> {
910 type Inner = (T, SockAddrStorage, socklen_t);
911
912 fn into_inner(self) -> Self::Inner {
913 (self.buffer, self.addr, self.msg.msg_namelen)
914 }
915}
916
917pub struct SendTo<T: IoBuf, S> {
919 pub(crate) fd: S,
920 pub(crate) buffer: T,
921 pub(crate) addr: SockAddr,
922 _p: PhantomPinned,
923}
924
925impl<T: IoBuf, S> SendTo<T, S> {
926 pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
928 Self {
929 fd,
930 buffer,
931 addr,
932 _p: PhantomPinned,
933 }
934 }
935}
936
937impl<T: IoBuf, S: AsFd> SendTo<T, S> {
938 unsafe fn call(&self) -> libc::ssize_t {
939 let slice = self.buffer.as_slice();
940 unsafe {
941 libc::sendto(
942 self.fd.as_fd().as_raw_fd(),
943 slice.as_ptr() as _,
944 slice.len(),
945 0,
946 self.addr.as_ptr().cast(),
947 self.addr.len(),
948 )
949 }
950 }
951}
952
953impl<T: IoBuf, S: AsFd> OpCode for SendTo<T, S> {
954 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
955 syscall!(self.call(), wait_writable(self.fd.as_fd().as_raw_fd()))
956 }
957
958 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
959 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
960 }
961
962 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
963 syscall!(break self.call())
964 }
965}
966
967impl<T: IoBuf, S> IntoInner for SendTo<T, S> {
968 type Inner = T;
969
970 fn into_inner(self) -> Self::Inner {
971 self.buffer
972 }
973}
974
975pub struct SendToVectored<T: IoVectoredBuf, S> {
977 pub(crate) fd: S,
978 pub(crate) buffer: T,
979 pub(crate) addr: SockAddr,
980 pub(crate) slices: Vec<IoSlice>,
981 pub(crate) msg: libc::msghdr,
982 _p: PhantomPinned,
983}
984
985impl<T: IoVectoredBuf, S> SendToVectored<T, S> {
986 pub fn new(fd: S, buffer: T, addr: SockAddr) -> Self {
988 Self {
989 fd,
990 buffer,
991 addr,
992 slices: vec![],
993 msg: unsafe { std::mem::zeroed() },
994 _p: PhantomPinned,
995 }
996 }
997}
998
999impl<T: IoVectoredBuf, S: AsFd> SendToVectored<T, S> {
1000 fn set_msg(&mut self) {
1001 self.slices = unsafe { self.buffer.io_slices() };
1002 self.msg.msg_name = &mut self.addr as *mut _ as _;
1003 self.msg.msg_namelen = std::mem::size_of_val(&self.addr) as _;
1004 self.msg.msg_iov = self.slices.as_mut_ptr() as _;
1005 self.msg.msg_iovlen = self.slices.len() as _;
1006 }
1007
1008 unsafe fn call(&self) -> libc::ssize_t {
1009 unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0) }
1010 }
1011}
1012
1013impl<T: IoVectoredBuf, S: AsFd> OpCode for SendToVectored<T, S> {
1014 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1015 let this = unsafe { self.get_unchecked_mut() };
1016 this.set_msg();
1017 syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1018 }
1019
1020 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1021 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1022 }
1023
1024 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1025 syscall!(break self.call())
1026 }
1027}
1028
1029impl<T: IoVectoredBuf, S> IntoInner for SendToVectored<T, S> {
1030 type Inner = T;
1031
1032 fn into_inner(self) -> Self::Inner {
1033 self.buffer
1034 }
1035}
1036
1037impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> RecvMsg<T, C, S> {
1038 unsafe fn call(&mut self) -> libc::ssize_t {
1039 unsafe { libc::recvmsg(self.fd.as_fd().as_raw_fd(), &mut self.msg, 0) }
1040 }
1041}
1042
1043impl<T: IoVectoredBufMut, C: IoBufMut, S: AsFd> OpCode for RecvMsg<T, C, S> {
1044 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1045 let this = unsafe { self.get_unchecked_mut() };
1046 unsafe { this.set_msg() };
1047 syscall!(this.call(), wait_readable(this.fd.as_fd().as_raw_fd()))
1048 }
1049
1050 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1051 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1052 }
1053
1054 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1055 let this = unsafe { self.get_unchecked_mut() };
1056 syscall!(break this.call())
1057 }
1058}
1059
1060impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> SendMsg<T, C, S> {
1061 unsafe fn call(&self) -> libc::ssize_t {
1062 unsafe { libc::sendmsg(self.fd.as_fd().as_raw_fd(), &self.msg, 0) }
1063 }
1064}
1065
1066impl<T: IoVectoredBuf, C: IoBuf, S: AsFd> OpCode for SendMsg<T, C, S> {
1067 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1068 let this = unsafe { self.get_unchecked_mut() };
1069 unsafe { this.set_msg() };
1070 syscall!(this.call(), wait_writable(this.fd.as_fd().as_raw_fd()))
1071 }
1072
1073 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1074 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1075 }
1076
1077 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1078 syscall!(break self.call())
1079 }
1080}
1081
1082impl<S: AsFd> OpCode for PollOnce<S> {
1083 fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
1084 Ok(Decision::wait_for(
1085 self.fd.as_fd().as_raw_fd(),
1086 self.interest,
1087 ))
1088 }
1089
1090 fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
1091 Some(OpType::Fd(self.fd.as_fd().as_raw_fd()))
1092 }
1093
1094 fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
1095 Poll::Ready(Ok(0))
1096 }
1097}