1#![allow(private_interfaces)]
34
35use std::future::Future;
36use std::io;
37use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr};
38use std::os::fd::{AsRawFd, FromRawFd, RawFd};
39use std::pin::Pin;
40use std::task::{Context, Poll};
41
42const DUMMY_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
43
44pub struct TcpStream {
50 fd: std::os::fd::OwnedFd,
52}
53
54impl TcpStream {
55 pub(crate) unsafe fn from_raw_fd(fd: RawFd) -> io::Result<Self> {
63 #[cfg(unix)]
66 unsafe {
67 let flags = libc::fcntl(fd, libc::F_GETFL);
68 if flags < 0 {
69 return Err(io::Error::last_os_error());
70 }
71 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
72 return Err(io::Error::last_os_error());
73 }
74 }
75
76 Ok(Self {
77 fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) },
80 })
81 }
82
83 pub fn connect(addr: &str) -> ConnectFuture {
97 let Ok(addr) = addr.parse::<SocketAddr>() else {
98 return ConnectFuture::Error(io::Error::new(
101 io::ErrorKind::InvalidInput,
102 "Invalid address format, use IP:PORT",
103 ));
104 };
105
106 ConnectFuture::Connecting(Box::new(ConnectingState {
107 addr,
108 fd: None,
109 started: false,
110 }))
111 }
112
113 pub fn read<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> ReadFuture<'a, 'b> {
119 ReadFuture {
120 stream: Some(self),
121 buf,
122 pos: 0,
123 }
124 }
125
126 pub fn write_all<'a, 'b>(&'a mut self, buf: &'b [u8]) -> WriteAllFuture<'a, 'b> {
132 WriteAllFuture {
133 stream: Some(self),
134 buf,
135 pos: 0,
136 }
137 }
138
139 pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
153 unsafe {
162 let ptr = self as *mut TcpStream;
163 (ReadHalf { _stream: &mut *ptr }, WriteHalf { _stream: &mut *ptr })
164 }
165 }
166
167 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
170 #[cfg(unix)]
171 unsafe {
172 let how = match how {
173 Shutdown::Read => libc::SHUT_RD,
174 Shutdown::Write => libc::SHUT_WR,
175 Shutdown::Both => libc::SHUT_RDWR,
176 };
177 if libc::shutdown(self.as_raw_fd(), how) < 0 {
178 return Err(io::Error::last_os_error());
179 }
180 }
181 #[cfg(not(unix))]
182 {
183 let _ = how;
184 return Err(io::Error::new(
185 io::ErrorKind::Unsupported,
186 "Shutdown not supported on this platform",
187 ));
188 }
189 Ok(())
190 }
191}
192
193impl AsRawFd for TcpStream {
194 fn as_raw_fd(&self) -> RawFd {
195 self.fd.as_raw_fd()
196 }
197}
198
199pub enum ConnectFuture {
202 Error(io::Error),
204 Connecting(Box<ConnectingState>),
207 Done,
209}
210
211struct ConnectingState {
212 addr: SocketAddr,
213 fd: Option<RawFd>,
214 started: bool,
215}
216
217impl Future for ConnectFuture {
218 type Output = io::Result<TcpStream>;
219
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221 match &mut *self {
222 ConnectFuture::Error(e) => {
223 let e = std::mem::replace(e, io::Error::other(""));
224 Poll::Ready(Err(e))
225 },
226 ConnectFuture::Done => panic!("ConnectFuture polled after completion"),
227 ConnectFuture::Connecting(state) => {
228 if !state.started {
229 state.started = true;
230
231 let fd: RawFd = create_socket(state.addr.is_ipv4());
234
235 if fd < 0 {
236 return Poll::Ready(Err(io::Error::last_os_error()));
237 }
238
239 let result = do_connect(fd, state.addr);
242
243 if result < 0 {
244 let err = io::Error::last_os_error();
245 if err.kind() != io::ErrorKind::WouldBlock {
246 unsafe { libc::close(fd) };
247 return Poll::Ready(Err(err));
248 }
249 state.fd = Some(fd);
252 return Poll::Pending;
253 }
254
255 state.fd = Some(fd);
258 }
259
260 if let Some(fd) = state.fd {
263 let mut pfd = libc::pollfd {
264 fd,
265 events: libc::POLLOUT,
266 revents: 0,
267 };
268 let ready = unsafe { libc::poll(&mut pfd, 1, 0) };
269
270 if ready < 0 {
271 let fd = state.fd.take().unwrap();
272 unsafe { libc::close(fd) };
273 return Poll::Ready(Err(io::Error::last_os_error()));
274 }
275
276 if ready == 0 {
277 cx.waker().wake_by_ref();
280 return Poll::Pending;
281 }
282
283 let mut err_val: libc::c_int = 0;
286 let mut err_len: libc::socklen_t =
287 size_of::<libc::c_int>() as libc::socklen_t;
288 unsafe {
289 libc::getsockopt(
290 fd,
291 libc::SOL_SOCKET,
292 libc::SO_ERROR,
293 &mut err_val as *mut _ as *mut _,
294 &mut err_len,
295 );
296 }
297 if err_val != 0 {
298 let fd = state.fd.take().unwrap();
299 unsafe { libc::close(fd) };
300 return Poll::Ready(Err(io::Error::from_raw_os_error(err_val)));
301 }
302
303 let fd = state.fd.take().unwrap();
306 let stream = match unsafe { TcpStream::from_raw_fd(fd) } {
307 Ok(s) => s,
308 Err(e) => return Poll::Ready(Err(e)),
309 };
310 *self = ConnectFuture::Done;
311 Poll::Ready(Ok(stream))
312 } else {
313 Poll::Pending
314 }
315 },
316 }
317 }
318}
319
320#[cfg(unix)]
323fn create_socket(ipv4: bool) -> RawFd {
324 unsafe {
325 let domain = if ipv4 { libc::AF_INET } else { libc::AF_INET6 };
326
327 #[cfg(target_os = "linux")]
328 let fd = libc::socket(domain, libc::SOCK_STREAM | libc::SOCK_CLOEXEC, 0);
329
330 #[cfg(not(target_os = "linux"))]
331 let fd = libc::socket(domain, libc::SOCK_STREAM, 0);
332
333 if fd < 0 {
334 return fd;
335 }
336
337 #[cfg(not(target_os = "linux"))]
338 {
339 if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
341 libc::close(fd);
342 return -1;
343 }
344 }
345
346 let flags = libc::fcntl(fd, libc::F_GETFL);
348 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
349 libc::close(fd);
350 return -1;
351 }
352
353 fd
354 }
355}
356
357#[cfg(unix)]
360fn do_connect(fd: RawFd, addr: SocketAddr) -> i32 {
361 unsafe {
362 if addr.is_ipv4() {
363 if let SocketAddr::V4(v4) = addr {
364 #[cfg(target_os = "linux")]
365 let sockaddr = libc::sockaddr_in {
366 sin_family: libc::AF_INET as u16,
367 sin_port: v4.port().to_be(),
368 sin_addr: libc::in_addr {
369 s_addr: u32::from_ne_bytes(v4.ip().octets()),
370 },
371 sin_zero: [0; 8],
372 };
373
374 #[cfg(not(target_os = "linux"))]
375 let sockaddr = libc::sockaddr_in {
376 sin_len: size_of::<libc::sockaddr_in>() as u8,
377 sin_family: libc::AF_INET as u8,
378 sin_port: v4.port().to_be(),
379 sin_addr: libc::in_addr {
380 s_addr: u32::from_ne_bytes(v4.ip().octets()),
381 },
382 sin_zero: [0; 8],
383 };
384
385 libc::connect(
386 fd,
387 &sockaddr as *const _ as *const libc::sockaddr,
388 size_of::<libc::sockaddr_in>() as libc::socklen_t,
389 )
390 } else {
391 -1
392 }
393 } else {
394 if let SocketAddr::V6(v6) = addr {
395 #[cfg(target_os = "linux")]
396 let sockaddr = libc::sockaddr_in6 {
397 sin6_family: libc::AF_INET6 as u16,
398 sin6_port: v6.port().to_be(),
399 sin6_flowinfo: v6.flowinfo(),
400 sin6_addr: libc::in6_addr {
401 s6_addr: v6.ip().octets(),
402 },
403 sin6_scope_id: v6.scope_id(),
404 };
405
406 #[cfg(not(target_os = "linux"))]
407 let sockaddr = libc::sockaddr_in6 {
408 sin6_len: size_of::<libc::sockaddr_in6>() as u8,
409 sin6_family: libc::AF_INET6 as u8,
410 sin6_port: v6.port().to_be(),
411 sin6_flowinfo: v6.flowinfo(),
412 sin6_addr: libc::in6_addr {
413 s6_addr: v6.ip().octets(),
414 },
415 sin6_scope_id: v6.scope_id(),
416 };
417
418 libc::connect(
419 fd,
420 &sockaddr as *const _ as *const libc::sockaddr,
421 size_of::<libc::sockaddr_in6>() as libc::socklen_t,
422 )
423 } else {
424 -1
425 }
426 }
427 }
428}
429
430pub struct ReadFuture<'a, 'b> {
433 stream: Option<&'a mut TcpStream>,
434 buf: &'b mut [u8],
435 pos: usize,
436}
437
438impl Future for ReadFuture<'_, '_> {
439 type Output = io::Result<usize>;
440
441 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
442 let stream_fd;
445 let buf_ptr;
446 let buf_len;
447
448 {
449 let stream = self.stream.as_mut().unwrap();
450 stream_fd = stream.as_raw_fd();
451 let pos = self.pos;
452 buf_ptr = self.buf[pos..].as_mut_ptr();
453 buf_len = self.buf[pos..].len();
454 }
455
456 #[cfg(unix)]
457 {
458 let result = unsafe { libc::read(stream_fd, buf_ptr as *mut _, buf_len) };
459
460 if result < 0 {
461 let err = io::Error::last_os_error();
462 if err.kind() == io::ErrorKind::WouldBlock {
463 return Poll::Pending;
468 }
469 return Poll::Ready(Err(err));
470 }
471
472 let n = result as usize;
473 if n == 0 {
474 return Poll::Ready(Ok(0)); }
476
477 self.pos += n;
478 Poll::Ready(Ok(n))
479 }
480
481 #[cfg(not(unix))]
482 {
483 let _ = (stream_fd, buf_ptr, buf_len);
484 Poll::Ready(Err(io::Error::new(
485 io::ErrorKind::Unsupported,
486 "TCP read not yet implemented on this platform",
487 )))
488 }
489 }
490}
491
492pub struct WriteAllFuture<'a, 'b> {
495 stream: Option<&'a mut TcpStream>,
496 buf: &'b [u8],
497 pos: usize,
498}
499
500impl Future for WriteAllFuture<'_, '_> {
501 type Output = io::Result<()>;
502
503 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
504 while self.pos < self.buf.len() {
505 let stream = self.stream.as_mut().unwrap();
506
507 #[cfg(unix)]
508 {
509 let result = unsafe {
510 libc::write(
511 stream.as_raw_fd(),
512 self.buf[self.pos..].as_ptr() as *const _,
513 self.buf[self.pos..].len(),
514 )
515 };
516
517 if result < 0 {
518 let err = io::Error::last_os_error();
519 if err.kind() == io::ErrorKind::WouldBlock {
520 return Poll::Pending;
521 }
522 return Poll::Ready(Err(err));
523 }
524
525 let n = result as usize;
526 if n == 0 {
527 return Poll::Ready(Err(io::Error::new(
528 io::ErrorKind::WriteZero,
529 "write zero byte",
530 )));
531 }
532
533 self.pos += n;
534 }
535
536 #[cfg(not(unix))]
537 {
538 let _ = stream;
539 return Poll::Ready(Err(io::Error::new(
540 io::ErrorKind::Unsupported,
541 "TCP write not yet implemented on this platform",
542 )));
543 }
544 }
545
546 Poll::Ready(Ok(()))
547 }
548}
549
550pub struct ReadHalf<'a> {
553 _stream: &'a mut TcpStream,
554}
555
556pub struct WriteHalf<'a> {
559 _stream: &'a mut TcpStream,
560}
561
562pub struct TcpListener {
568 fd: std::os::fd::OwnedFd,
570}
571
572impl TcpListener {
573 pub fn bind(addr: &str) -> BindFuture {
588 let Ok(addr) = addr.parse::<SocketAddr>() else {
589 return BindFuture::Error(io::Error::new(
590 io::ErrorKind::InvalidInput,
591 "Invalid address format, use IP:PORT",
592 ));
593 };
594
595 BindFuture::Binding(BindingState { addr })
596 }
597
598 pub fn accept(&mut self) -> AcceptFuture<'_> {
601 AcceptFuture { listener: self }
602 }
603
604 pub fn local_addr(&self) -> io::Result<SocketAddr> {
607 #[cfg(unix)]
608 unsafe {
609 let mut addr: libc::sockaddr_storage = std::mem::zeroed();
610 let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
611
612 if libc::getsockname(
613 self.as_raw_fd(),
614 &mut addr as *mut _ as *mut libc::sockaddr,
615 &mut len,
616 ) < 0
617 {
618 return Err(io::Error::last_os_error());
619 }
620
621 Ok(DUMMY_ADDR)
624 }
625
626 #[cfg(not(unix))]
627 {
628 Err(io::Error::new(
629 io::ErrorKind::Unsupported,
630 "local_addr not supported on this platform",
631 ))
632 }
633 }
634}
635
636impl AsRawFd for TcpListener {
637 fn as_raw_fd(&self) -> RawFd {
638 self.fd.as_raw_fd()
639 }
640}
641
642pub enum BindFuture {
645 Error(io::Error),
647 Binding(BindingState),
649 Done,
651}
652
653struct BindingState {
654 addr: SocketAddr,
655}
656
657impl Future for BindFuture {
658 type Output = io::Result<TcpListener>;
659
660 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
661 match &mut *self {
662 BindFuture::Error(e) => {
663 let e = std::mem::replace(e, io::Error::other(""));
664 Poll::Ready(Err(e))
665 },
666 BindFuture::Done => panic!("BindFuture polled after completion"),
667 BindFuture::Binding(state) => {
668 let fd = create_socket(state.addr.is_ipv4());
671
672 if fd < 0 {
673 return Poll::Ready(Err(io::Error::last_os_error()));
674 }
675
676 #[cfg(unix)]
679 unsafe {
680 let opt: i32 = 1;
681 if libc::setsockopt(
682 fd,
683 libc::SOL_SOCKET,
684 libc::SO_REUSEADDR,
685 &opt as *const _ as *const _,
686 size_of::<i32>() as libc::socklen_t,
687 ) < 0
688 {
689 libc::close(fd);
690 return Poll::Ready(Err(io::Error::last_os_error()));
691 }
692
693 let result = do_bind(fd, state.addr);
696 if result < 0 {
697 let err = io::Error::last_os_error();
698 libc::close(fd);
699 return Poll::Ready(Err(err));
700 }
701
702 if libc::listen(fd, 128) < 0 {
705 let err = io::Error::last_os_error();
706 libc::close(fd);
707 return Poll::Ready(Err(err));
708 }
709
710 let listener = TcpListener {
711 fd: std::os::fd::OwnedFd::from_raw_fd(fd),
713 };
714
715 *self = BindFuture::Done;
716 Poll::Ready(Ok(listener))
717 }
718
719 #[cfg(not(unix))]
720 {
721 Poll::Ready(Err(io::Error::new(
722 io::ErrorKind::Unsupported,
723 "TCP bind not yet implemented on this platform",
724 )))
725 }
726 },
727 }
728 }
729}
730
731#[cfg(unix)]
734fn do_bind(fd: RawFd, addr: SocketAddr) -> i32 {
735 unsafe {
736 if addr.is_ipv4() {
737 if let SocketAddr::V4(v4) = addr {
738 #[cfg(target_os = "linux")]
739 let sockaddr = libc::sockaddr_in {
740 sin_family: libc::AF_INET as u16,
741 sin_port: v4.port().to_be(),
742 sin_addr: libc::in_addr {
743 s_addr: u32::from_ne_bytes(v4.ip().octets()),
744 },
745 sin_zero: [0; 8],
746 };
747
748 #[cfg(not(target_os = "linux"))]
749 let sockaddr = libc::sockaddr_in {
750 sin_len: size_of::<libc::sockaddr_in>() as u8,
751 sin_family: libc::AF_INET as u8,
752 sin_port: v4.port().to_be(),
753 sin_addr: libc::in_addr {
754 s_addr: u32::from_ne_bytes(v4.ip().octets()),
755 },
756 sin_zero: [0; 8],
757 };
758
759 libc::bind(
760 fd,
761 &sockaddr as *const _ as *const libc::sockaddr,
762 size_of::<libc::sockaddr_in>() as libc::socklen_t,
763 )
764 } else {
765 -1
766 }
767 } else {
768 if let SocketAddr::V6(v6) = addr {
769 #[cfg(target_os = "linux")]
770 let sockaddr = libc::sockaddr_in6 {
771 sin6_family: libc::AF_INET6 as u16,
772 sin6_port: v6.port().to_be(),
773 sin6_flowinfo: v6.flowinfo(),
774 sin6_addr: libc::in6_addr {
775 s6_addr: v6.ip().octets(),
776 },
777 sin6_scope_id: v6.scope_id(),
778 };
779
780 #[cfg(not(target_os = "linux"))]
781 let sockaddr = libc::sockaddr_in6 {
782 sin6_len: size_of::<libc::sockaddr_in6>() as u8,
783 sin6_family: libc::AF_INET6 as u8,
784 sin6_port: v6.port().to_be(),
785 sin6_flowinfo: v6.flowinfo(),
786 sin6_addr: libc::in6_addr {
787 s6_addr: v6.ip().octets(),
788 },
789 sin6_scope_id: v6.scope_id(),
790 };
791
792 libc::bind(
793 fd,
794 &sockaddr as *const _ as *const libc::sockaddr,
795 size_of::<libc::sockaddr_in6>() as libc::socklen_t,
796 )
797 } else {
798 -1
799 }
800 }
801 }
802}
803
804pub struct AcceptFuture<'a> {
807 listener: &'a mut TcpListener,
808}
809
810impl Future for AcceptFuture<'_> {
811 type Output = io::Result<(TcpStream, SocketAddr)>;
812
813 #[allow(unused_mut)]
814 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
815 #[cfg(unix)]
816 {
817 let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
818 let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
819
820 let fd = unsafe {
821 #[cfg(target_os = "linux")]
822 {
823 libc::accept4(
824 self.listener.as_raw_fd(),
825 &mut addr as *mut _ as *mut libc::sockaddr,
826 &mut len,
827 libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
828 )
829 }
830
831 #[cfg(not(target_os = "linux"))]
832 {
833 let fd = libc::accept(
835 self.listener.as_raw_fd(),
836 &mut addr as *mut _ as *mut libc::sockaddr,
837 &mut len,
838 );
839
840 if fd >= 0 {
841 if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
843 libc::close(fd);
844 return Poll::Ready(Err(io::Error::last_os_error()));
845 }
846
847 let flags = libc::fcntl(fd, libc::F_GETFL);
849 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
850 libc::close(fd);
851 return Poll::Ready(Err(io::Error::last_os_error()));
852 }
853 }
854
855 fd
856 }
857 };
858
859 if fd < 0 {
860 let err = io::Error::last_os_error();
861 if err.kind() == io::ErrorKind::WouldBlock {
862 return Poll::Pending;
863 }
864 return Poll::Ready(Err(err));
865 }
866
867 let stream = match unsafe { TcpStream::from_raw_fd(fd) } {
868 Ok(s) => s,
869 Err(e) => return Poll::Ready(Err(e)),
870 };
871
872 let peer_addr = match self.listener.local_addr() {
875 Ok(_) => DUMMY_ADDR,
876 Err(_) => return Poll::Ready(Err(io::Error::last_os_error())),
877 };
878
879 Poll::Ready(Ok((stream, peer_addr)))
880 }
881
882 #[cfg(not(unix))]
883 {
884 Poll::Ready(Err(io::Error::new(
885 io::ErrorKind::Unsupported,
886 "TCP accept not yet implemented on this platform",
887 )))
888 }
889 }
890}
891
892pub struct UdpSocket {
914 fd: std::os::fd::OwnedFd,
916}
917
918impl UdpSocket {
919 pub fn bind(addr: &str) -> BindUdpFuture {
933 let Ok(addr) = addr.parse::<SocketAddr>() else {
934 return BindUdpFuture::Error(io::Error::new(
935 io::ErrorKind::InvalidInput,
936 "Invalid address format, use IP:PORT",
937 ));
938 };
939
940 BindUdpFuture::Binding(BindingUdpState { addr })
941 }
942
943 pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFromFuture<'a, 'b> {
949 RecvFromFuture {
950 stream: Some(self),
951 buf,
952 }
953 }
954
955 pub fn send_to<'a, 'b>(&'a mut self, buf: &'b [u8], addr: SocketAddr) -> SendToFuture<'a, 'b> {
961 SendToFuture {
962 stream: Some(self),
963 buf,
964 addr,
965 }
966 }
967
968 pub fn connect(&mut self, addr: SocketAddr) -> ConnectUdpFuture {
974 ConnectUdpFuture {
975 fd: self.fd.as_raw_fd(),
976 addr,
977 done: false,
978 }
979 }
980}
981
982impl AsRawFd for UdpSocket {
983 fn as_raw_fd(&self) -> RawFd {
984 self.fd.as_raw_fd()
985 }
986}
987
988pub enum BindUdpFuture {
991 Error(io::Error),
993 Binding(BindingUdpState),
995 Done,
997}
998
999struct BindingUdpState {
1000 addr: SocketAddr,
1001}
1002
1003impl Future for BindUdpFuture {
1004 type Output = io::Result<UdpSocket>;
1005
1006 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1007 match &mut *self {
1008 BindUdpFuture::Error(e) => {
1009 let e = std::mem::replace(e, io::Error::other(""));
1010 Poll::Ready(Err(e))
1011 },
1012 BindUdpFuture::Done => panic!("BindUdpFuture polled after completion"),
1013 BindUdpFuture::Binding(state) => {
1014 let fd = create_udp_socket(state.addr.is_ipv4());
1017
1018 if fd < 0 {
1019 return Poll::Ready(Err(io::Error::last_os_error()));
1020 }
1021
1022 let result = do_bind_udp(fd, state.addr);
1025 if result < 0 {
1026 let err = io::Error::last_os_error();
1027 unsafe { libc::close(fd) };
1028 return Poll::Ready(Err(err));
1029 }
1030
1031 let socket = UdpSocket {
1032 fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) },
1034 };
1035
1036 *self = BindUdpFuture::Done;
1037 Poll::Ready(Ok(socket))
1038 },
1039 }
1040 }
1041}
1042
1043#[cfg(unix)]
1046fn create_udp_socket(ipv4: bool) -> RawFd {
1047 unsafe {
1048 let domain = if ipv4 { libc::AF_INET } else { libc::AF_INET6 };
1049
1050 #[cfg(target_os = "linux")]
1051 let fd =
1052 libc::socket(domain, libc::SOCK_DGRAM | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, 0);
1053
1054 #[cfg(not(target_os = "linux"))]
1055 let fd = libc::socket(domain, libc::SOCK_DGRAM, 0);
1056
1057 if fd < 0 {
1058 return fd;
1059 }
1060
1061 #[cfg(not(target_os = "linux"))]
1062 {
1063 if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
1065 libc::close(fd);
1066 return -1;
1067 }
1068
1069 let flags = libc::fcntl(fd, libc::F_GETFL);
1071 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
1072 libc::close(fd);
1073 return -1;
1074 }
1075 }
1076
1077 fd
1078 }
1079}
1080
1081#[cfg(unix)]
1084fn do_bind_udp(fd: RawFd, addr: SocketAddr) -> i32 {
1085 unsafe {
1086 if addr.is_ipv4() {
1087 if let SocketAddr::V4(v4) = addr {
1088 #[cfg(target_os = "linux")]
1089 let sockaddr = libc::sockaddr_in {
1090 sin_family: libc::AF_INET as u16,
1091 sin_port: v4.port().to_be(),
1092 sin_addr: libc::in_addr {
1093 s_addr: u32::from_ne_bytes(v4.ip().octets()),
1094 },
1095 sin_zero: [0; 8],
1096 };
1097
1098 #[cfg(not(target_os = "linux"))]
1099 let sockaddr = libc::sockaddr_in {
1100 sin_len: size_of::<libc::sockaddr_in>() as u8,
1101 sin_family: libc::AF_INET as u8,
1102 sin_port: v4.port().to_be(),
1103 sin_addr: libc::in_addr {
1104 s_addr: u32::from_ne_bytes(v4.ip().octets()),
1105 },
1106 sin_zero: [0; 8],
1107 };
1108
1109 libc::bind(
1110 fd,
1111 &sockaddr as *const _ as *const libc::sockaddr,
1112 size_of::<libc::sockaddr_in>() as libc::socklen_t,
1113 )
1114 } else {
1115 -1
1116 }
1117 } else {
1118 if let SocketAddr::V6(v6) = addr {
1119 #[cfg(target_os = "linux")]
1120 let sockaddr = libc::sockaddr_in6 {
1121 sin6_family: libc::AF_INET6 as u16,
1122 sin6_port: v6.port().to_be(),
1123 sin6_flowinfo: v6.flowinfo(),
1124 sin6_addr: libc::in6_addr {
1125 s6_addr: v6.ip().octets(),
1126 },
1127 sin6_scope_id: v6.scope_id(),
1128 };
1129
1130 #[cfg(not(target_os = "linux"))]
1131 let sockaddr = libc::sockaddr_in6 {
1132 sin6_len: size_of::<libc::sockaddr_in6>() as u8,
1133 sin6_family: libc::AF_INET6 as u8,
1134 sin6_port: v6.port().to_be(),
1135 sin6_flowinfo: v6.flowinfo(),
1136 sin6_addr: libc::in6_addr {
1137 s6_addr: v6.ip().octets(),
1138 },
1139 sin6_scope_id: v6.scope_id(),
1140 };
1141
1142 libc::bind(
1143 fd,
1144 &sockaddr as *const _ as *const libc::sockaddr,
1145 size_of::<libc::sockaddr_in6>() as libc::socklen_t,
1146 )
1147 } else {
1148 -1
1149 }
1150 }
1151 }
1152}
1153
1154pub struct RecvFromFuture<'a, 'b> {
1157 stream: Option<&'a mut UdpSocket>,
1158 buf: &'b mut [u8],
1159}
1160
1161impl Future for RecvFromFuture<'_, '_> {
1162 type Output = io::Result<(usize, SocketAddr)>;
1163
1164 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1165 let stream_fd;
1168 let buf_ptr;
1169 let buf_len;
1170
1171 {
1172 let stream = self.stream.as_mut().unwrap();
1173 stream_fd = stream.as_raw_fd();
1174 buf_ptr = self.buf.as_mut_ptr();
1175 buf_len = self.buf.len();
1176 }
1177
1178 #[cfg(unix)]
1179 {
1180 let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
1181 let mut addr_len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
1182
1183 let result = unsafe {
1184 libc::recvfrom(
1185 stream_fd,
1186 buf_ptr as *mut _,
1187 buf_len,
1188 0,
1189 &mut addr as *mut _ as *mut libc::sockaddr,
1190 &mut addr_len,
1191 )
1192 };
1193
1194 if result < 0 {
1195 let err = io::Error::last_os_error();
1196 if err.kind() == io::ErrorKind::WouldBlock {
1197 return Poll::Pending;
1198 }
1199 return Poll::Ready(Err(err));
1200 }
1201
1202 let n = result as usize;
1203
1204 let peer_addr = SocketAddr::V4(std::net::SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
1207
1208 Poll::Ready(Ok((n, peer_addr)))
1209 }
1210
1211 #[cfg(not(unix))]
1212 {
1213 let _ = (stream_fd, buf_ptr, buf_len);
1214 Poll::Ready(Err(io::Error::new(
1215 io::ErrorKind::Unsupported,
1216 "UDP recv_from not yet implemented on this platform",
1217 )))
1218 }
1219 }
1220}
1221
1222pub struct SendToFuture<'a, 'b> {
1225 stream: Option<&'a mut UdpSocket>,
1226 buf: &'b [u8],
1227 addr: SocketAddr,
1228}
1229
1230impl Future for SendToFuture<'_, '_> {
1231 type Output = io::Result<usize>;
1232
1233 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1234 let stream = self.stream.as_mut().unwrap();
1235 let stream_fd = stream.as_raw_fd();
1236
1237 #[cfg(unix)]
1238 {
1239 let result = match self.addr {
1240 SocketAddr::V4(v4) => {
1241 let sockaddr = libc::sockaddr_in {
1242 #[cfg(target_os = "macos")]
1243 sin_len: size_of::<libc::sockaddr_in>() as u8,
1244 sin_family: libc::AF_INET as _,
1245 sin_port: v4.port().to_be(),
1246 sin_addr: libc::in_addr {
1247 s_addr: u32::from(*v4.ip()).to_be(),
1248 },
1249 sin_zero: [0; 8],
1250 };
1251 unsafe {
1252 libc::sendto(
1253 stream_fd,
1254 self.buf.as_ptr() as *const _,
1255 self.buf.len(),
1256 0,
1257 &sockaddr as *const _ as *const _,
1258 size_of::<libc::sockaddr_in>() as libc::socklen_t,
1259 )
1260 }
1261 },
1262 SocketAddr::V6(v6) => {
1263 let sockaddr = libc::sockaddr_in6 {
1264 sin6_family: libc::AF_INET6 as _,
1265 sin6_port: v6.port().to_be(),
1266 sin6_flowinfo: v6.flowinfo().to_be(),
1267 sin6_addr: libc::in6_addr {
1268 s6_addr: v6.ip().octets(),
1269 },
1270 sin6_scope_id: v6.scope_id(),
1271 #[cfg(target_os = "macos")]
1272 sin6_len: size_of::<libc::sockaddr_in6>() as u8,
1273 };
1274 unsafe {
1275 libc::sendto(
1276 stream_fd,
1277 self.buf.as_ptr() as *const _,
1278 self.buf.len(),
1279 0,
1280 &sockaddr as *const _ as *const _,
1281 size_of::<libc::sockaddr_in6>() as libc::socklen_t,
1282 )
1283 }
1284 },
1285 };
1286
1287 if result < 0 {
1288 let err = io::Error::last_os_error();
1289 if err.kind() == io::ErrorKind::WouldBlock {
1290 return Poll::Pending;
1291 }
1292 return Poll::Ready(Err(err));
1293 }
1294
1295 let n = result as usize;
1296 Poll::Ready(Ok(n))
1297 }
1298
1299 #[cfg(not(unix))]
1300 {
1301 let _ = stream_fd;
1302 Poll::Ready(Err(io::Error::new(
1303 io::ErrorKind::Unsupported,
1304 "UDP send_to not yet implemented on this platform",
1305 )))
1306 }
1307 }
1308}
1309
1310pub struct ConnectUdpFuture {
1313 fd: RawFd,
1314 addr: SocketAddr,
1315 done: bool,
1316}
1317
1318impl Future for ConnectUdpFuture {
1319 type Output = io::Result<()>;
1320
1321 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
1322 assert!(!self.done, "ConnectUdpFuture polled after completion");
1323
1324 #[cfg(unix)]
1327 {
1328 let result = unsafe {
1329 match self.addr {
1330 SocketAddr::V4(v4) => {
1331 #[cfg(target_os = "linux")]
1332 let sockaddr = libc::sockaddr_in {
1333 sin_family: libc::AF_INET as u16,
1334 sin_port: v4.port().to_be(),
1335 sin_addr: libc::in_addr {
1336 s_addr: u32::from_ne_bytes(v4.ip().octets()),
1337 },
1338 sin_zero: [0; 8],
1339 };
1340
1341 #[cfg(not(target_os = "linux"))]
1342 let sockaddr = libc::sockaddr_in {
1343 sin_len: size_of::<libc::sockaddr_in>() as u8,
1344 sin_family: libc::AF_INET as u8,
1345 sin_port: v4.port().to_be(),
1346 sin_addr: libc::in_addr {
1347 s_addr: u32::from_ne_bytes(v4.ip().octets()),
1348 },
1349 sin_zero: [0; 8],
1350 };
1351
1352 libc::connect(
1353 self.fd,
1354 &sockaddr as *const _ as *const libc::sockaddr,
1355 size_of::<libc::sockaddr_in>() as libc::socklen_t,
1356 )
1357 },
1358 SocketAddr::V6(v6) => {
1359 #[cfg(target_os = "linux")]
1360 let sockaddr = libc::sockaddr_in6 {
1361 sin6_family: libc::AF_INET6 as u16,
1362 sin6_port: v6.port().to_be(),
1363 sin6_flowinfo: v6.flowinfo(),
1364 sin6_addr: libc::in6_addr {
1365 s6_addr: v6.ip().octets(),
1366 },
1367 sin6_scope_id: v6.scope_id(),
1368 };
1369
1370 #[cfg(not(target_os = "linux"))]
1371 let sockaddr = libc::sockaddr_in6 {
1372 sin6_len: size_of::<libc::sockaddr_in6>() as u8,
1373 sin6_family: libc::AF_INET6 as u8,
1374 sin6_port: v6.port().to_be(),
1375 sin6_flowinfo: v6.flowinfo(),
1376 sin6_addr: libc::in6_addr {
1377 s6_addr: v6.ip().octets(),
1378 },
1379 sin6_scope_id: v6.scope_id(),
1380 };
1381
1382 libc::connect(
1383 self.fd,
1384 &sockaddr as *const _ as *const libc::sockaddr,
1385 size_of::<libc::sockaddr_in6>() as libc::socklen_t,
1386 )
1387 },
1388 }
1389 };
1390
1391 if result < 0 {
1392 return Poll::Ready(Err(io::Error::last_os_error()));
1393 }
1394 }
1395
1396 #[cfg(not(unix))]
1397 {
1398 let _ = (self.fd, self.addr);
1399 return Poll::Ready(Err(io::Error::new(
1400 io::ErrorKind::Unsupported,
1401 "UDP connect not yet implemented on this platform",
1402 )));
1403 }
1404
1405 self.done = true;
1406 Poll::Ready(Ok(()))
1407 }
1408}
1409
1410#[cfg(test)]
1411mod tests {
1412 use super::*;
1413
1414 #[test]
1415 fn test_tcp_stream_create() {
1416 let result = unsafe { TcpStream::from_raw_fd(-1) };
1419 assert!(result.is_err());
1420 }
1421
1422 #[test]
1423 fn test_tcp_listener_bind_invalid() {
1424 let future = TcpListener::bind("invalid_address");
1425 match future {
1428 BindFuture::Error(_) => {},
1429 _ => panic!("Expected Error future"),
1430 }
1431 }
1432
1433 #[test]
1434 fn test_connect_invalid_addr() {
1435 let future = TcpStream::connect("not_an_address");
1436 match future {
1437 ConnectFuture::Error(_) => {},
1438 _ => panic!("Expected Error future for invalid address"),
1439 }
1440 }
1441}