1#![allow(clippy::non_send_fields_in_send_ty)]
11
12use std::future::Future;
13use std::marker::PhantomData;
14use std::mem::{self, size_of, MaybeUninit};
15use std::pin::Pin;
16use std::task::{self, Poll};
17use std::{io, ptr};
18
19use crate::cancel::{Cancel, CancelOp, CancelResult};
20use crate::extract::{Extract, Extractor};
21use crate::fd::{AsyncFd, Descriptor, File};
22use crate::io::{
23 Buf, BufIdx, BufMut, BufMutSlice, BufSlice, ReadBuf, ReadBufPool, ReadNBuf, SkipBuf,
24};
25use crate::op::{op_async_iter, op_future, poll_state, OpState};
26use crate::{libc, man_link, SubmissionQueue};
27
28#[doc = man_link!(socket(2))]
30pub const fn socket<D: Descriptor>(
31 sq: SubmissionQueue,
32 domain: libc::c_int,
33 r#type: libc::c_int,
34 protocol: libc::c_int,
35 flags: libc::c_int,
36) -> Socket<D> {
37 Socket {
38 sq: Some(sq),
39 state: OpState::NotStarted((domain, r#type, protocol, flags)),
40 kind: PhantomData,
41 }
42}
43
44impl<D: Descriptor> AsyncFd<D> {
46 #[doc = man_link!(connect(2))]
48 pub fn connect<'fd, A>(&'fd self, address: impl Into<Box<A>>) -> Connect<'fd, A, D>
49 where
50 A: SocketAddress,
51 {
52 let address = address.into();
53 Connect::new(self, address, ())
54 }
55
56 #[doc = man_link!(send(2))]
58 pub const fn send<'fd, B>(&'fd self, buf: B, flags: libc::c_int) -> Send<'fd, B, D>
59 where
60 B: Buf,
61 {
62 Send::new(self, buf, (libc::IORING_OP_SEND as u8, flags))
63 }
64
65 pub const fn send_zc<'fd, B>(&'fd self, buf: B, flags: libc::c_int) -> Send<'fd, B, D>
78 where
79 B: Buf,
80 {
81 Send::new(self, buf, (libc::IORING_OP_SEND_ZC as u8, flags))
82 }
83
84 pub const fn send_all<'fd, B>(&'fd self, buf: B) -> SendAll<'fd, B, D>
87 where
88 B: Buf,
89 {
90 SendAll::new(self, buf)
91 }
92
93 #[doc = man_link!(sendmsg(2))]
95 pub fn send_vectored<'fd, B, const N: usize>(
96 &'fd self,
97 bufs: B,
98 flags: libc::c_int,
99 ) -> SendMsg<'fd, B, NoAddress, N, D>
100 where
101 B: BufSlice<N>,
102 {
103 self.sendmsg(libc::IORING_OP_SENDMSG as u8, bufs, NoAddress, flags)
104 }
105
106 pub fn send_vectored_zc<'fd, B, const N: usize>(
109 &'fd self,
110 bufs: B,
111 flags: libc::c_int,
112 ) -> SendMsg<'fd, B, NoAddress, N, D>
113 where
114 B: BufSlice<N>,
115 {
116 self.sendmsg(libc::IORING_OP_SENDMSG_ZC as u8, bufs, NoAddress, flags)
117 }
118
119 pub fn send_all_vectored<'fd, B, const N: usize>(
123 &'fd self,
124 bufs: B,
125 ) -> SendAllVectored<'fd, B, N, D>
126 where
127 B: BufSlice<N>,
128 {
129 SendAllVectored::new(self, bufs)
130 }
131
132 #[doc = man_link!(send(2))]
134 pub const fn sendto<'fd, B, A>(
135 &'fd self,
136 buf: B,
137 address: A,
138 flags: libc::c_int,
139 ) -> SendTo<'fd, B, A, D>
140 where
141 B: Buf,
142 A: SocketAddress,
143 {
144 SendTo::new(self, buf, address, (libc::IORING_OP_SEND as u8, flags))
145 }
146
147 pub const fn sendto_zc<'fd, B, A>(
152 &'fd self,
153 buf: B,
154 address: A,
155 flags: libc::c_int,
156 ) -> SendTo<'fd, B, A, D>
157 where
158 B: Buf,
159 A: SocketAddress,
160 {
161 SendTo::new(self, buf, address, (libc::IORING_OP_SEND_ZC as u8, flags))
162 }
163
164 #[doc = man_link!(sendmsg(2))]
166 pub fn sendto_vectored<'fd, B, A, const N: usize>(
167 &'fd self,
168 bufs: B,
169 address: A,
170 flags: libc::c_int,
171 ) -> SendMsg<'fd, B, A, N, D>
172 where
173 B: BufSlice<N>,
174 A: SocketAddress,
175 {
176 self.sendmsg(libc::IORING_OP_SENDMSG as u8, bufs, address, flags)
177 }
178
179 pub fn sendto_vectored_zc<'fd, B, A, const N: usize>(
182 &'fd self,
183 bufs: B,
184 address: A,
185 flags: libc::c_int,
186 ) -> SendMsg<'fd, B, A, N, D>
187 where
188 B: BufSlice<N>,
189 A: SocketAddress,
190 {
191 self.sendmsg(libc::IORING_OP_SENDMSG_ZC as u8, bufs, address, flags)
192 }
193
194 fn sendmsg<'fd, B, A, const N: usize>(
195 &'fd self,
196 op: u8,
197 bufs: B,
198 address: A,
199 flags: libc::c_int,
200 ) -> SendMsg<'fd, B, A, N, D>
201 where
202 B: BufSlice<N>,
203 A: SocketAddress,
204 {
205 let msg = unsafe { mem::zeroed() };
207 let iovecs = unsafe { bufs.as_iovecs() };
208 SendMsg::new(self, bufs, address, msg, iovecs, (op, flags))
209 }
210
211 #[doc = man_link!(recv(2))]
214 pub const fn recv<'fd, B>(&'fd self, buf: B, flags: libc::c_int) -> Recv<'fd, B, D>
215 where
216 B: BufMut,
217 {
218 Recv::new(self, buf, flags)
219 }
220
221 pub const fn multishot_recv<'fd>(
232 &'fd self,
233 pool: ReadBufPool,
234 flags: libc::c_int,
235 ) -> MultishotRecv<'fd, D> {
236 MultishotRecv::new(self, pool, flags)
237 }
238
239 pub const fn recv_n<'fd, B>(&'fd self, buf: B, n: usize) -> RecvN<'fd, B, D>
242 where
243 B: BufMut,
244 {
245 RecvN::new(self, buf, n)
246 }
247
248 #[doc = man_link!(recvmsg(2))]
251 pub fn recv_vectored<'fd, B, const N: usize>(
252 &'fd self,
253 mut bufs: B,
254 flags: libc::c_int,
255 ) -> RecvVectored<'fd, B, N, D>
256 where
257 B: BufMutSlice<N>,
258 {
259 let msg = unsafe { Box::new(mem::zeroed()) };
262 let iovecs = unsafe { bufs.as_iovecs_mut() };
263 RecvVectored::new(self, bufs, msg, iovecs, flags)
264 }
265
266 pub fn recv_n_vectored<'fd, B, const N: usize>(
269 &'fd self,
270 bufs: B,
271 n: usize,
272 ) -> RecvNVectored<'fd, B, N, D>
273 where
274 B: BufMutSlice<N>,
275 {
276 RecvNVectored::new(self, bufs, n)
277 }
278
279 #[doc = man_link!(recvmsg(2))]
281 pub fn recvfrom<'fd, B, A>(&'fd self, mut buf: B, flags: libc::c_int) -> RecvFrom<'fd, B, A, D>
282 where
283 B: BufMut,
284 A: SocketAddress,
285 {
286 let msg = unsafe { mem::zeroed() };
288 let (buf_ptr, buf_len) = unsafe { buf.parts_mut() };
289 let iovec = libc::iovec {
290 iov_base: buf_ptr.cast(),
291 iov_len: buf_len as _,
292 };
293 let msg = Box::new((msg, MaybeUninit::uninit()));
294 RecvFrom::new(self, buf, msg, iovec, flags)
295 }
296
297 #[doc = man_link!(recvmsg(2))]
299 pub fn recvfrom_vectored<'fd, B, A, const N: usize>(
300 &'fd self,
301 mut bufs: B,
302 flags: libc::c_int,
303 ) -> RecvFromVectored<'fd, B, A, N, D>
304 where
305 B: BufMutSlice<N>,
306 A: SocketAddress,
307 {
308 let msg = unsafe { mem::zeroed() };
310 let iovecs = unsafe { bufs.as_iovecs_mut() };
311 let msg = Box::new((msg, MaybeUninit::uninit()));
312 RecvFromVectored::new(self, bufs, msg, iovecs, flags)
313 }
314
315 #[doc = man_link!(shutdown(2))]
317 pub const fn shutdown<'fd>(&'fd self, how: std::net::Shutdown) -> Shutdown<'fd, D> {
318 let how = match how {
319 std::net::Shutdown::Read => libc::SHUT_RD,
320 std::net::Shutdown::Write => libc::SHUT_WR,
321 std::net::Shutdown::Both => libc::SHUT_RDWR,
322 };
323 Shutdown::new(self, how)
324 }
325
326 #[doc = man_link!(accept(2))]
331 pub fn accept<'fd, A>(&'fd self) -> Accept<'fd, A, D> {
332 const _: () = assert!(libc::SOCK_CLOEXEC == libc::O_CLOEXEC);
336 self.accept4(D::cloexec_flag())
337 }
338
339 #[doc = man_link!(accept4(2))]
344 pub fn accept4<'fd, A>(&'fd self, flags: libc::c_int) -> Accept<'fd, A, D> {
345 let address = Box::new((MaybeUninit::uninit(), 0));
346 Accept::new(self, address, flags)
347 }
348
349 pub fn multishot_accept<'fd>(&'fd self) -> MultishotAccept<'fd, D> {
355 const _: () = assert!(libc::SOCK_CLOEXEC == libc::O_CLOEXEC);
359 self.multishot_accept4(D::cloexec_flag())
360 }
361
362 pub const fn multishot_accept4<'fd>(&'fd self, flags: libc::c_int) -> MultishotAccept<'fd, D> {
367 MultishotAccept::new(self, flags)
368 }
369
370 #[doc = man_link!(getsockopt(2))]
378 #[doc(alias = "getsockopt")]
379 #[allow(clippy::cast_sign_loss)] pub fn socket_option<'fd, T>(
381 &'fd self,
382 level: libc::c_int,
383 optname: libc::c_int,
384 ) -> SocketOption<'fd, T, D> {
385 let value = Box::new(MaybeUninit::uninit());
387 SocketOption::new(self, value, (level as libc::__u32, optname as libc::__u32))
388 }
389
390 #[doc = man_link!(setsockopt(2))]
398 #[doc(alias = "setsockopt")]
399 #[allow(clippy::cast_sign_loss)] pub fn set_socket_option<'fd, T>(
401 &'fd self,
402 level: libc::c_int,
403 optname: libc::c_int,
404 optvalue: T,
405 ) -> SetSocketOption<'fd, T, D> {
406 let value = Box::new(optvalue);
407 SetSocketOption::new(self, value, (level as libc::__u32, optname as libc::__u32))
408 }
409}
410
411#[derive(Debug)]
415#[must_use = "`Future`s do nothing unless polled"]
416pub struct Socket<D: Descriptor = File> {
417 sq: Option<SubmissionQueue>,
418 state: OpState<(libc::c_int, libc::c_int, libc::c_int, libc::c_int)>,
419 kind: PhantomData<D>,
420}
421
422impl<D: Descriptor + Unpin> Future for Socket<D> {
423 type Output = io::Result<AsyncFd<D>>;
424
425 fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
426 let op_index = poll_state!(
427 Socket,
428 self.state,
429 self.sq.as_ref().unwrap(),
432 ctx,
433 |submission, (domain, r#type, protocol, flags)| unsafe {
434 submission.socket(domain, r#type, protocol, flags);
435 D::create_flags(submission);
436 },
437 );
438
439 let sq = self.sq.as_ref().unwrap();
442 match sq.poll_op(ctx, op_index) {
443 Poll::Ready(result) => {
444 self.state = OpState::Done;
445 match result {
446 Ok((_, fd)) => Poll::Ready(Ok(unsafe {
447 AsyncFd::from_raw(fd, self.sq.take().unwrap())
450 })),
451 Err(err) => Poll::Ready(Err(err)),
452 }
453 }
454 Poll::Pending => Poll::Pending,
455 }
456 }
457}
458
459op_future! {
461 fn AsyncFd::connect -> (),
462 struct Connect<'fd, A: SocketAddress> {
463 address: Box<A>,
465 },
466 setup_state: _unused: (),
467 setup: |submission, fd, (address,), ()| unsafe {
468 let (ptr, len) = SocketAddress::as_ptr(&**address);
469 submission.connect(fd.fd(), ptr, len);
470 },
471 map_result: |result| Ok(debug_assert!(result == 0)),
472 extract: |this, (address,), res| -> Box<A> {
473 debug_assert!(res == 0);
474 Ok(address)
475 },
476}
477
478op_future! {
480 fn AsyncFd::send -> usize,
481 struct Send<'fd, B: Buf> {
482 buf: B,
485 },
486 drop_using: Box,
487 setup_state: flags: (u8, libc::c_int),
488 setup: |submission, fd, (buf,), (op, flags)| unsafe {
489 let (ptr, len) = buf.parts();
490 submission.send(op, fd.fd(), ptr, len, flags);
491 },
492 map_result: |n| {
493 #[allow(clippy::cast_sign_loss)] Ok(n as usize)
495 },
496 extract: |this, (buf,), n| -> (B, usize) {
497 #[allow(clippy::cast_sign_loss)] Ok((buf, n as usize))
499 },
500}
501
502#[derive(Debug)]
504pub struct SendAll<'fd, B, D: Descriptor = File> {
505 send: Extractor<Send<'fd, SkipBuf<B>, D>>,
506}
507
508impl<'fd, B: Buf, D: Descriptor> SendAll<'fd, B, D> {
509 const fn new(fd: &'fd AsyncFd<D>, buf: B) -> SendAll<'fd, B, D> {
510 let buf = SkipBuf { buf, skip: 0 };
511 SendAll {
512 send: Extractor {
514 fut: fd.send(buf, 0),
515 },
516 }
517 }
518
519 fn inner_poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<io::Result<B>> {
522 let this = unsafe { Pin::into_inner_unchecked(self) };
524 let mut send = unsafe { Pin::new_unchecked(&mut this.send) };
525 match send.as_mut().poll(ctx) {
526 Poll::Ready(Ok((_, 0))) => Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
527 Poll::Ready(Ok((mut buf, n))) => {
528 buf.skip += n as u32;
529
530 if let (_, 0) = unsafe { buf.parts() } {
531 return Poll::Ready(Ok(buf.buf));
533 }
534
535 send.set(send.fut.fd.send(buf, 0).extract());
536 unsafe { Pin::new_unchecked(this) }.inner_poll(ctx)
537 }
538 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
539 Poll::Pending => Poll::Pending,
540 }
541 }
542}
543
544impl<'fd, B, D: Descriptor> Cancel for SendAll<'fd, B, D> {
545 fn try_cancel(&mut self) -> CancelResult {
546 self.send.try_cancel()
547 }
548
549 fn cancel(&mut self) -> CancelOp {
550 self.send.cancel()
551 }
552}
553
554impl<'fd, B: Buf, D: Descriptor> Future for SendAll<'fd, B, D> {
555 type Output = io::Result<()>;
556
557 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
558 self.inner_poll(ctx).map_ok(|_| ())
559 }
560}
561
562impl<'fd, B: Buf, D: Descriptor> Extract for SendAll<'fd, B, D> {}
563
564impl<'fd, B: Buf, D: Descriptor> Future for Extractor<SendAll<'fd, B, D>> {
565 type Output = io::Result<B>;
566
567 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
568 unsafe { Pin::map_unchecked_mut(self, |s| &mut s.fut) }.inner_poll(ctx)
570 }
571}
572
573op_future! {
575 fn AsyncFd::sendto -> usize,
576 struct SendTo<'fd, B: Buf, A: SocketAddress> {
577 buf: B,
580 address: A,
582 },
583 drop_using: Box,
584 setup_state: flags: (u8, libc::c_int),
585 setup: |submission, fd, (buf, address), (op, flags)| unsafe {
586 let (buf, buf_len) = buf.parts();
587 let (addr, addr_len) = SocketAddress::as_ptr(address);
588 submission.sendto(op, fd.fd(), buf, buf_len, addr, addr_len, flags);
589 },
590 map_result: |n| {
591 #[allow(clippy::cast_sign_loss)] Ok(n as usize)
593 },
594 extract: |this, (buf, _), n| -> (B, usize) {
595 #[allow(clippy::cast_sign_loss)] Ok((buf, n as usize))
597 },
598}
599
600op_future! {
602 fn AsyncFd::send_vectored -> usize,
603 struct SendMsg<'fd, B: BufSlice<N>, A: SocketAddress; const N: usize> {
604 bufs: B,
607 address: A,
609 msg: libc::msghdr,
618 iovecs: [libc::iovec; N],
619 },
620 drop_using: Box,
621 impl !Unpin,
623 setup_state: flags: (u8, libc::c_int),
624 setup: |submission, fd, (_, address, msg, iovecs), (op, flags)| unsafe {
625 msg.msg_iov = iovecs.as_mut_ptr();
626 msg.msg_iovlen = N;
627 let (addr, addr_len) = SocketAddress::as_ptr(address);
628 msg.msg_name = addr.cast_mut().cast();
629 msg.msg_namelen = addr_len;
630 submission.sendmsg(op, fd.fd(), &*msg, flags);
631 },
632 map_result: |n| {
633 #[allow(clippy::cast_sign_loss)] Ok(n as usize)
635 },
636 extract: |this, (buf, _, _, _), n| -> (B, usize) {
637 #[allow(clippy::cast_sign_loss)] Ok((buf, n as usize))
639 },
640}
641
642#[derive(Debug)]
644pub struct SendAllVectored<'fd, B, const N: usize, D: Descriptor = File> {
645 send: Extractor<SendMsg<'fd, B, NoAddress, N, D>>,
646 skip: u64,
647}
648
649impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> SendAllVectored<'fd, B, N, D> {
650 fn new(fd: &'fd AsyncFd<D>, bufs: B) -> SendAllVectored<'fd, B, N, D> {
651 SendAllVectored {
652 send: fd.send_vectored(bufs, 0).extract(),
653 skip: 0,
654 }
655 }
656
657 fn inner_poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<io::Result<B>> {
660 let this = unsafe { Pin::into_inner_unchecked(self) };
662 let mut send = unsafe { Pin::new_unchecked(&mut this.send) };
663 match send.as_mut().poll(ctx) {
664 Poll::Ready(Ok((_, 0))) => Poll::Ready(Err(io::ErrorKind::WriteZero.into())),
665 Poll::Ready(Ok((bufs, n))) => {
666 this.skip += n as u64;
667
668 let mut iovecs = unsafe { bufs.as_iovecs() };
669 let mut skip = this.skip;
670 for iovec in &mut iovecs {
671 if iovec.iov_len as u64 <= skip {
672 skip -= iovec.iov_len as u64;
674 iovec.iov_len = 0;
675 } else {
676 iovec.iov_len -= skip as usize;
677 break;
678 }
679 }
680
681 if iovecs[N - 1].iov_len == 0 {
682 return Poll::Ready(Ok(bufs));
684 }
685
686 let msg = unsafe { mem::zeroed() };
688 let op = libc::IORING_OP_SENDMSG as u8;
689 send.set(
690 SendMsg::new(send.fut.fd, bufs, NoAddress, msg, iovecs, (op, 0)).extract(),
691 );
692 unsafe { Pin::new_unchecked(this) }.inner_poll(ctx)
693 }
694 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
695 Poll::Pending => Poll::Pending,
696 }
697 }
698}
699
700impl<'fd, B, const N: usize, D: Descriptor> Cancel for SendAllVectored<'fd, B, N, D> {
701 fn try_cancel(&mut self) -> CancelResult {
702 self.send.try_cancel()
703 }
704
705 fn cancel(&mut self) -> CancelOp {
706 self.send.cancel()
707 }
708}
709
710impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Future for SendAllVectored<'fd, B, N, D> {
711 type Output = io::Result<()>;
712
713 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
714 self.inner_poll(ctx).map_ok(|_| ())
715 }
716}
717
718impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Extract for SendAllVectored<'fd, B, N, D> {}
719
720impl<'fd, B: BufSlice<N>, const N: usize, D: Descriptor> Future
721 for Extractor<SendAllVectored<'fd, B, N, D>>
722{
723 type Output = io::Result<B>;
724
725 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
726 unsafe { Pin::map_unchecked_mut(self, |s| &mut s.fut) }.inner_poll(ctx)
727 }
728}
729
730op_future! {
732 fn AsyncFd::recv -> B,
733 struct Recv<'fd, B: BufMut> {
734 buf: B,
737 },
738 drop_using: Box,
739 setup_state: flags: libc::c_int,
740 setup: |submission, fd, (buf,), flags| unsafe {
741 let (ptr, len) = buf.parts_mut();
742 submission.recv(fd.fd(), ptr, len, flags);
743 if let Some(buf_group) = buf.buffer_group() {
744 submission.set_buffer_select(buf_group.0);
745 }
746 },
747 map_result: |this, (mut buf,), buf_idx, n| {
748 #[allow(clippy::cast_sign_loss)] unsafe { buf.buffer_init(BufIdx(buf_idx), n as u32) };
752 Ok(buf)
753 },
754}
755
756op_async_iter! {
758 fn AsyncFd::multishot_recv -> ReadBuf,
759 struct MultishotRecv<'fd> {
760 buf_pool: ReadBufPool,
762 },
763 setup_state: flags: libc::c_int,
764 setup: |submission, this, flags| unsafe {
765 submission.multishot_recv(this.fd.fd(), flags, this.buf_pool.group_id().0);
766 },
767 map_result: |this, buf_idx, n| {
768 if n == 0 {
769 this.state = crate::op::OpState::Done;
771 }
772 #[allow(clippy::cast_sign_loss)] unsafe { this.buf_pool.new_buffer(BufIdx(buf_idx), n as u32) }
776 },
777}
778
779#[derive(Debug)]
781pub struct RecvN<'fd, B, D: Descriptor = File> {
782 recv: Recv<'fd, ReadNBuf<B>, D>,
783 left: usize,
785}
786
787impl<'fd, B: BufMut, D: Descriptor> RecvN<'fd, B, D> {
788 const fn new(fd: &'fd AsyncFd<D>, buf: B, n: usize) -> RecvN<'fd, B, D> {
789 let buf = ReadNBuf { buf, last_read: 0 };
790 RecvN {
791 recv: fd.recv(buf, 0),
792 left: n,
793 }
794 }
795}
796
797impl<'fd, B, D: Descriptor> Cancel for RecvN<'fd, B, D> {
798 fn try_cancel(&mut self) -> CancelResult {
799 self.recv.try_cancel()
800 }
801
802 fn cancel(&mut self) -> CancelOp {
803 self.recv.cancel()
804 }
805}
806
807impl<'fd, B: BufMut, D: Descriptor> Future for RecvN<'fd, B, D> {
808 type Output = io::Result<B>;
809
810 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
811 let this = unsafe { Pin::into_inner_unchecked(self) };
813 let mut recv = unsafe { Pin::new_unchecked(&mut this.recv) };
814 match recv.as_mut().poll(ctx) {
815 Poll::Ready(Ok(buf)) => {
816 if buf.last_read == 0 {
817 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
818 }
819
820 if buf.last_read >= this.left {
821 return Poll::Ready(Ok(buf.buf));
823 }
824
825 this.left -= buf.last_read;
826
827 recv.set(recv.fd.recv(buf, 0));
828 unsafe { Pin::new_unchecked(this) }.poll(ctx)
829 }
830 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
831 Poll::Pending => Poll::Pending,
832 }
833 }
834}
835
836op_future! {
838 fn AsyncFd::recv_vectored -> (B, libc::c_int),
839 struct RecvVectored<'fd, B: BufMutSlice<N>; const N: usize> {
840 bufs: B,
843 msg: Box<libc::msghdr>,
846 iovecs: [libc::iovec; N],
855 },
856 drop_using: Box,
857 impl !Unpin,
859 setup_state: flags: libc::c_int,
860 setup: |submission, fd, (_, msg, iovecs), flags| unsafe {
861 msg.msg_iov = iovecs.as_mut_ptr();
862 msg.msg_iovlen = N;
863 submission.recvmsg(fd.fd(), &**msg, flags);
864 },
865 map_result: |this, (mut bufs, msg, _), n| {
866 #[allow(clippy::cast_sign_loss)] unsafe { bufs.set_init(n as usize) };
870 Ok((bufs, msg.msg_flags))
871 },
872}
873
874#[derive(Debug)]
876pub struct RecvNVectored<'fd, B, const N: usize, D: Descriptor = File> {
877 recv: RecvVectored<'fd, ReadNBuf<B>, N, D>,
878 left: usize,
880}
881
882impl<'fd, B: BufMutSlice<N>, const N: usize, D: Descriptor> RecvNVectored<'fd, B, N, D> {
883 fn new(fd: &'fd AsyncFd<D>, buf: B, n: usize) -> RecvNVectored<'fd, B, N, D> {
884 let bufs = ReadNBuf { buf, last_read: 0 };
885 RecvNVectored {
886 recv: fd.recv_vectored(bufs, 0),
887 left: n,
888 }
889 }
890}
891
892impl<'fd, B, const N: usize, D: Descriptor> Cancel for RecvNVectored<'fd, B, N, D> {
893 fn try_cancel(&mut self) -> CancelResult {
894 self.recv.try_cancel()
895 }
896
897 fn cancel(&mut self) -> CancelOp {
898 self.recv.cancel()
899 }
900}
901
902impl<'fd, B: BufMutSlice<N>, const N: usize, D: Descriptor> Future for RecvNVectored<'fd, B, N, D> {
903 type Output = io::Result<B>;
904
905 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
906 let this = unsafe { Pin::into_inner_unchecked(self) };
908 let mut recv = unsafe { Pin::new_unchecked(&mut this.recv) };
909 match recv.as_mut().poll(ctx) {
910 Poll::Ready(Ok((bufs, _))) => {
911 if bufs.last_read == 0 {
912 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
913 }
914
915 if bufs.last_read >= this.left {
916 return Poll::Ready(Ok(bufs.buf));
918 }
919
920 this.left -= bufs.last_read;
921
922 recv.set(recv.fd.recv_vectored(bufs, 0));
923 unsafe { Pin::new_unchecked(this) }.poll(ctx)
924 }
925 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
926 Poll::Pending => Poll::Pending,
927 }
928 }
929}
930
931op_future! {
933 fn AsyncFd::recvfrom -> (B, A, libc::c_int),
934 struct RecvFrom<'fd, B: BufMut, A: SocketAddress> {
935 buf: B,
938 msg: Box<(libc::msghdr, MaybeUninit<A>)>,
941 iovec: libc::iovec,
950 },
951 drop_using: Box,
952 impl !Unpin,
954 setup_state: flags: libc::c_int,
955 setup: |submission, fd, (buf, msg, iovec), flags| unsafe {
956 let address = &mut msg.1;
957 let msg = &mut msg.0;
958 msg.msg_iov = &mut *iovec;
959 msg.msg_iovlen = 1;
960 let (addr, addr_len) = SocketAddress::as_mut_ptr(address);
961 msg.msg_name = addr.cast();
962 msg.msg_namelen = addr_len;
963 submission.recvmsg(fd.fd(), &*msg, flags);
964 if let Some(buf_group) = buf.buffer_group() {
965 submission.set_buffer_select(buf_group.0);
966 }
967 },
968 map_result: |this, (mut buf, msg, _), buf_idx, n| {
969 #[allow(clippy::cast_sign_loss)] unsafe { buf.buffer_init(BufIdx(buf_idx), n as u32) };
973 let address = unsafe { SocketAddress::init(msg.1, msg.0.msg_namelen) };
975 Ok((buf, address, msg.0.msg_flags))
976 },
977}
978
979op_future! {
981 fn AsyncFd::recvfrom_vectored -> (B, A, libc::c_int),
982 struct RecvFromVectored<'fd, B: BufMutSlice<N>, A: SocketAddress; const N: usize> {
983 bufs: B,
986 msg: Box<(libc::msghdr, MaybeUninit<A>)>,
989 iovecs: [libc::iovec; N],
998 },
999 drop_using: Box,
1000 impl !Unpin,
1002 setup_state: flags: libc::c_int,
1003 setup: |submission, fd, (_, msg, iovecs), flags| unsafe {
1004 let address = &mut msg.1;
1005 let msg = &mut msg.0;
1006 msg.msg_iov = iovecs.as_mut_ptr();
1007 msg.msg_iovlen = N;
1008 let (addr, addr_len) = SocketAddress::as_mut_ptr(address);
1009 msg.msg_name = addr.cast();
1010 msg.msg_namelen = addr_len;
1011 submission.recvmsg(fd.fd(), &*msg, flags);
1012 },
1013 map_result: |this, (mut bufs, msg, _), n| {
1014 #[allow(clippy::cast_sign_loss)] unsafe { bufs.set_init(n as usize) };
1018 let address = unsafe { SocketAddress::init(msg.1, msg.0.msg_namelen) };
1020 Ok((bufs, address, msg.0.msg_flags))
1021 },
1022}
1023
1024op_future! {
1026 fn AsyncFd::shutdown -> (),
1027 struct Shutdown<'fd> {
1028 },
1030 setup_state: flags: libc::c_int,
1031 setup: |submission, fd, (), how| unsafe {
1032 submission.shutdown(fd.fd(), how);
1033 },
1034 map_result: |n| Ok(debug_assert!(n == 0)),
1035}
1036
1037op_future! {
1039 fn AsyncFd::accept -> (AsyncFd<D>, A),
1040 struct Accept<'fd, A: SocketAddress> {
1041 address: Box<(MaybeUninit<A>, libc::socklen_t)>,
1044 },
1045 setup_state: flags: libc::c_int,
1046 setup: |submission, fd, (address,), flags| unsafe {
1047 let (ptr, len) = SocketAddress::as_mut_ptr(&mut address.0);
1048 address.1 = len;
1049 submission.accept(fd.fd(), ptr, &mut address.1, flags);
1050 submission.set_async();
1051 D::create_flags(submission);
1052 },
1053 map_result: |this, (address,), fd| {
1054 let sq = this.fd.sq.clone();
1055 let stream = unsafe { AsyncFd::from_raw(fd, sq) };
1057 let len = address.1;
1058 let address = unsafe { SocketAddress::init(address.0, len) };
1060 Ok((stream, address))
1061 },
1062}
1063
1064op_async_iter! {
1066 fn AsyncFd::multishot_accept -> AsyncFd<D>,
1067 struct MultishotAccept<'fd> {
1068 },
1070 setup_state: flags: libc::c_int,
1071 setup: |submission, this, flags| unsafe {
1072 submission.multishot_accept(this.fd.fd(), flags);
1073 submission.set_async();
1074 D::create_flags(submission);
1075 },
1076 map_result: |this, _flags, fd| {
1077 let sq = this.fd.sq.clone();
1078 unsafe { AsyncFd::from_raw(fd, sq) }
1080 },
1081}
1082
1083op_future! {
1085 fn AsyncFd::socket_option -> T,
1086 struct SocketOption<'fd, T> {
1087 value: Box<MaybeUninit<T>>,
1090 },
1091 setup_state: flags: (libc::__u32, libc::__u32),
1092 setup: |submission, fd, (value,), (level, optname)| unsafe {
1093 let optvalue = ptr::addr_of_mut!(**value).cast();
1094 let optlen = size_of::<T>() as u32;
1095 submission.uring_command(libc::SOCKET_URING_OP_GETSOCKOPT, fd.fd(), level, optname, optvalue, optlen);
1096 },
1097 map_result: |this, (value,), optlen| {
1098 debug_assert!(optlen == (size_of::<T>() as i32));
1099 Ok(unsafe { MaybeUninit::assume_init(*value) })
1102 },
1103}
1104
1105op_future! {
1107 fn AsyncFd::set_socket_option -> (),
1108 struct SetSocketOption<'fd, T> {
1109 value: Box<T>,
1112 },
1113 setup_state: flags: (libc::__u32, libc::__u32),
1114 setup: |submission, fd, (value,), (level, optname)| unsafe {
1115 let optvalue = ptr::addr_of_mut!(**value).cast();
1116 let optlen = size_of::<T>() as u32;
1117 submission.uring_command(libc::SOCKET_URING_OP_SETSOCKOPT, fd.fd(), level, optname, optvalue, optlen);
1118 },
1119 map_result: |result| Ok(debug_assert!(result == 0)),
1120 extract: |this, (value,), res| -> Box<T> {
1121 debug_assert!(res == 0);
1122 Ok(value)
1123 },
1124}
1125
1126pub trait SocketAddress: Sized {
1140 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t);
1155
1156 unsafe fn as_mut_ptr(address: &mut MaybeUninit<Self>)
1171 -> (*mut libc::sockaddr, libc::socklen_t);
1172
1173 unsafe fn init(address: MaybeUninit<Self>, length: libc::socklen_t) -> Self;
1181}
1182
1183impl SocketAddress for (libc::sockaddr, libc::socklen_t) {
1185 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
1186 (ptr::addr_of!(self.0).cast(), self.1)
1187 }
1188
1189 unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
1190 (
1191 ptr::addr_of_mut!((*this.as_mut_ptr()).0).cast(),
1192 size_of::<libc::sockaddr>() as _,
1193 )
1194 }
1195
1196 unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
1197 debug_assert!(length >= size_of::<libc::sa_family_t>() as _);
1198 let mut this = this.assume_init();
1200 this.1 = length;
1201 this
1202 }
1203}
1204
1205impl SocketAddress for (libc::sockaddr_storage, libc::socklen_t) {
1207 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
1208 (ptr::addr_of!(self.0).cast(), self.1)
1209 }
1210
1211 unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
1212 (
1213 ptr::addr_of_mut!((*this.as_mut_ptr()).0).cast(),
1214 size_of::<libc::sockaddr_storage>() as _,
1215 )
1216 }
1217
1218 unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
1219 debug_assert!(length >= size_of::<libc::sa_family_t>() as _);
1220 let mut this = this.assume_init();
1222 this.1 = length;
1223 this
1224 }
1225}
1226
1227impl SocketAddress for libc::sockaddr_in {
1229 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
1230 (
1231 (self as *const libc::sockaddr_in).cast(),
1232 size_of::<Self>() as _,
1233 )
1234 }
1235
1236 unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
1237 (this.as_mut_ptr().cast(), size_of::<Self>() as _)
1238 }
1239
1240 unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
1241 debug_assert!(length == size_of::<Self>() as _);
1242 this.assume_init()
1244 }
1245}
1246
1247impl SocketAddress for libc::sockaddr_in6 {
1249 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
1250 (
1251 (self as *const libc::sockaddr_in6).cast(),
1252 size_of::<Self>() as _,
1253 )
1254 }
1255
1256 unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
1257 (this.as_mut_ptr().cast(), size_of::<Self>() as _)
1258 }
1259
1260 unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
1261 debug_assert!(length == size_of::<Self>() as _);
1262 this.assume_init()
1264 }
1265}
1266
1267impl SocketAddress for (libc::sockaddr_un, libc::socklen_t) {
1269 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
1270 (ptr::addr_of!(self.0).cast(), self.1)
1271 }
1272
1273 unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
1274 (
1275 ptr::addr_of_mut!((*this.as_mut_ptr()).0).cast(),
1276 size_of::<libc::sockaddr_un>() as _,
1277 )
1278 }
1279
1280 unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
1281 debug_assert!(length >= size_of::<libc::sa_family_t>() as _);
1282 let mut this = this.assume_init();
1284 this.1 = length;
1285 this
1286 }
1287}
1288
1289#[derive(Debug)]
1296pub struct NoAddress;
1297
1298impl SocketAddress for NoAddress {
1299 unsafe fn as_ptr(&self) -> (*const libc::sockaddr, libc::socklen_t) {
1300 (ptr::null_mut(), 0)
1302 }
1303
1304 unsafe fn as_mut_ptr(this: &mut MaybeUninit<Self>) -> (*mut libc::sockaddr, libc::socklen_t) {
1305 _ = this;
1306 (ptr::null_mut(), 0)
1308 }
1309
1310 unsafe fn init(this: MaybeUninit<Self>, length: libc::socklen_t) -> Self {
1311 debug_assert!(length == 0);
1312 this.assume_init()
1313 }
1314}