#![allow(private_interfaces)]
use std::future::Future;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr};
use std::os::fd::{AsRawFd, FromRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll};
const DUMMY_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0);
pub struct TcpStream {
fd: std::os::fd::OwnedFd,
}
impl TcpStream {
pub(crate) unsafe fn from_raw_fd(fd: RawFd) -> io::Result<Self> {
#[cfg(unix)]
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFL);
if flags < 0 {
return Err(io::Error::last_os_error());
}
if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
return Err(io::Error::last_os_error());
}
}
Ok(Self {
fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) },
})
}
pub fn connect(addr: &str) -> ConnectFuture {
let Ok(addr) = addr.parse::<SocketAddr>() else {
return ConnectFuture::Error(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid address format, use IP:PORT",
));
};
ConnectFuture::Connecting(Box::new(ConnectingState {
addr,
fd: None,
started: false,
}))
}
pub fn read<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> ReadFuture<'a, 'b> {
ReadFuture {
stream: Some(self),
buf,
pos: 0,
}
}
pub fn write_all<'a, 'b>(&'a mut self, buf: &'b [u8]) -> WriteAllFuture<'a, 'b> {
WriteAllFuture {
stream: Some(self),
buf,
pos: 0,
}
}
pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) {
unsafe {
let ptr = self as *mut TcpStream;
(ReadHalf { _stream: &mut *ptr }, WriteHalf { _stream: &mut *ptr })
}
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
#[cfg(unix)]
unsafe {
let how = match how {
Shutdown::Read => libc::SHUT_RD,
Shutdown::Write => libc::SHUT_WR,
Shutdown::Both => libc::SHUT_RDWR,
};
if libc::shutdown(self.as_raw_fd(), how) < 0 {
return Err(io::Error::last_os_error());
}
}
#[cfg(not(unix))]
{
let _ = how;
return Err(io::Error::new(
io::ErrorKind::Unsupported,
"Shutdown not supported on this platform",
));
}
Ok(())
}
}
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
pub enum ConnectFuture {
Error(io::Error),
Connecting(Box<ConnectingState>),
Done,
}
struct ConnectingState {
addr: SocketAddr,
fd: Option<RawFd>,
started: bool,
}
impl Future for ConnectFuture {
type Output = io::Result<TcpStream>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
ConnectFuture::Error(e) => {
let e = std::mem::replace(e, io::Error::other(""));
Poll::Ready(Err(e))
},
ConnectFuture::Done => panic!("ConnectFuture polled after completion"),
ConnectFuture::Connecting(state) => {
if !state.started {
state.started = true;
let fd: RawFd = create_socket(state.addr.is_ipv4());
if fd < 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
let result = do_connect(fd, state.addr);
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() != io::ErrorKind::WouldBlock {
unsafe { libc::close(fd) };
return Poll::Ready(Err(err));
}
state.fd = Some(fd);
return Poll::Pending;
}
state.fd = Some(fd);
}
if let Some(fd) = state.fd {
let mut pfd = libc::pollfd {
fd,
events: libc::POLLOUT,
revents: 0,
};
let ready = unsafe { libc::poll(&mut pfd, 1, 0) };
if ready < 0 {
let fd = state.fd.take().unwrap();
unsafe { libc::close(fd) };
return Poll::Ready(Err(io::Error::last_os_error()));
}
if ready == 0 {
cx.waker().wake_by_ref();
return Poll::Pending;
}
let mut err_val: libc::c_int = 0;
let mut err_len: libc::socklen_t =
size_of::<libc::c_int>() as libc::socklen_t;
unsafe {
libc::getsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_ERROR,
&mut err_val as *mut _ as *mut _,
&mut err_len,
);
}
if err_val != 0 {
let fd = state.fd.take().unwrap();
unsafe { libc::close(fd) };
return Poll::Ready(Err(io::Error::from_raw_os_error(err_val)));
}
let fd = state.fd.take().unwrap();
let stream = match unsafe { TcpStream::from_raw_fd(fd) } {
Ok(s) => s,
Err(e) => return Poll::Ready(Err(e)),
};
*self = ConnectFuture::Done;
Poll::Ready(Ok(stream))
} else {
Poll::Pending
}
},
}
}
}
#[cfg(unix)]
fn create_socket(ipv4: bool) -> RawFd {
unsafe {
let domain = if ipv4 { libc::AF_INET } else { libc::AF_INET6 };
#[cfg(target_os = "linux")]
let fd = libc::socket(domain, libc::SOCK_STREAM | libc::SOCK_CLOEXEC, 0);
#[cfg(not(target_os = "linux"))]
let fd = libc::socket(domain, libc::SOCK_STREAM, 0);
if fd < 0 {
return fd;
}
#[cfg(not(target_os = "linux"))]
{
if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
libc::close(fd);
return -1;
}
}
let flags = libc::fcntl(fd, libc::F_GETFL);
if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
libc::close(fd);
return -1;
}
fd
}
}
#[cfg(unix)]
fn do_connect(fd: RawFd, addr: SocketAddr) -> i32 {
unsafe {
if addr.is_ipv4() {
if let SocketAddr::V4(v4) = addr {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in {
sin_len: size_of::<libc::sockaddr_in>() as u8,
sin_family: libc::AF_INET as u8,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
libc::connect(
fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in>() as libc::socklen_t,
)
} else {
-1
}
} else {
if let SocketAddr::V6(v6) = addr {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in6 {
sin6_family: libc::AF_INET6 as u16,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in6 {
sin6_len: size_of::<libc::sockaddr_in6>() as u8,
sin6_family: libc::AF_INET6 as u8,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
libc::connect(
fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in6>() as libc::socklen_t,
)
} else {
-1
}
}
}
}
pub struct ReadFuture<'a, 'b> {
stream: Option<&'a mut TcpStream>,
buf: &'b mut [u8],
pos: usize,
}
impl Future for ReadFuture<'_, '_> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream_fd;
let buf_ptr;
let buf_len;
{
let stream = self.stream.as_mut().unwrap();
stream_fd = stream.as_raw_fd();
let pos = self.pos;
buf_ptr = self.buf[pos..].as_mut_ptr();
buf_len = self.buf[pos..].len();
}
#[cfg(unix)]
{
let result = unsafe { libc::read(stream_fd, buf_ptr as *mut _, buf_len) };
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
return Poll::Pending;
}
return Poll::Ready(Err(err));
}
let n = result as usize;
if n == 0 {
return Poll::Ready(Ok(0)); }
self.pos += n;
Poll::Ready(Ok(n))
}
#[cfg(not(unix))]
{
let _ = (stream_fd, buf_ptr, buf_len);
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"TCP read not yet implemented on this platform",
)))
}
}
}
pub struct WriteAllFuture<'a, 'b> {
stream: Option<&'a mut TcpStream>,
buf: &'b [u8],
pos: usize,
}
impl Future for WriteAllFuture<'_, '_> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
while self.pos < self.buf.len() {
let stream = self.stream.as_mut().unwrap();
#[cfg(unix)]
{
let result = unsafe {
libc::write(
stream.as_raw_fd(),
self.buf[self.pos..].as_ptr() as *const _,
self.buf[self.pos..].len(),
)
};
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
return Poll::Pending;
}
return Poll::Ready(Err(err));
}
let n = result as usize;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::WriteZero,
"write zero byte",
)));
}
self.pos += n;
}
#[cfg(not(unix))]
{
let _ = stream;
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"TCP write not yet implemented on this platform",
)));
}
}
Poll::Ready(Ok(()))
}
}
pub struct ReadHalf<'a> {
_stream: &'a mut TcpStream,
}
pub struct WriteHalf<'a> {
_stream: &'a mut TcpStream,
}
pub struct TcpListener {
fd: std::os::fd::OwnedFd,
}
impl TcpListener {
pub fn bind(addr: &str) -> BindFuture {
let Ok(addr) = addr.parse::<SocketAddr>() else {
return BindFuture::Error(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid address format, use IP:PORT",
));
};
BindFuture::Binding(BindingState { addr })
}
pub fn accept(&mut self) -> AcceptFuture<'_> {
AcceptFuture { listener: self }
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
#[cfg(unix)]
unsafe {
let mut addr: libc::sockaddr_storage = std::mem::zeroed();
let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
if libc::getsockname(
self.as_raw_fd(),
&mut addr as *mut _ as *mut libc::sockaddr,
&mut len,
) < 0
{
return Err(io::Error::last_os_error());
}
Ok(DUMMY_ADDR)
}
#[cfg(not(unix))]
{
Err(io::Error::new(
io::ErrorKind::Unsupported,
"local_addr not supported on this platform",
))
}
}
}
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
pub enum BindFuture {
Error(io::Error),
Binding(BindingState),
Done,
}
struct BindingState {
addr: SocketAddr,
}
impl Future for BindFuture {
type Output = io::Result<TcpListener>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
BindFuture::Error(e) => {
let e = std::mem::replace(e, io::Error::other(""));
Poll::Ready(Err(e))
},
BindFuture::Done => panic!("BindFuture polled after completion"),
BindFuture::Binding(state) => {
let fd = create_socket(state.addr.is_ipv4());
if fd < 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
#[cfg(unix)]
unsafe {
let opt: i32 = 1;
if libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_REUSEADDR,
&opt as *const _ as *const _,
size_of::<i32>() as libc::socklen_t,
) < 0
{
libc::close(fd);
return Poll::Ready(Err(io::Error::last_os_error()));
}
let result = do_bind(fd, state.addr);
if result < 0 {
let err = io::Error::last_os_error();
libc::close(fd);
return Poll::Ready(Err(err));
}
if libc::listen(fd, 128) < 0 {
let err = io::Error::last_os_error();
libc::close(fd);
return Poll::Ready(Err(err));
}
let listener = TcpListener {
fd: std::os::fd::OwnedFd::from_raw_fd(fd),
};
*self = BindFuture::Done;
Poll::Ready(Ok(listener))
}
#[cfg(not(unix))]
{
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"TCP bind not yet implemented on this platform",
)))
}
},
}
}
}
#[cfg(unix)]
fn do_bind(fd: RawFd, addr: SocketAddr) -> i32 {
unsafe {
if addr.is_ipv4() {
if let SocketAddr::V4(v4) = addr {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in {
sin_len: size_of::<libc::sockaddr_in>() as u8,
sin_family: libc::AF_INET as u8,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
libc::bind(
fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in>() as libc::socklen_t,
)
} else {
-1
}
} else {
if let SocketAddr::V6(v6) = addr {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in6 {
sin6_family: libc::AF_INET6 as u16,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in6 {
sin6_len: size_of::<libc::sockaddr_in6>() as u8,
sin6_family: libc::AF_INET6 as u8,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
libc::bind(
fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in6>() as libc::socklen_t,
)
} else {
-1
}
}
}
}
pub struct AcceptFuture<'a> {
listener: &'a mut TcpListener,
}
impl Future for AcceptFuture<'_> {
type Output = io::Result<(TcpStream, SocketAddr)>;
#[allow(unused_mut)]
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(unix)]
{
let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
let mut len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let fd = unsafe {
#[cfg(target_os = "linux")]
{
libc::accept4(
self.listener.as_raw_fd(),
&mut addr as *mut _ as *mut libc::sockaddr,
&mut len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
)
}
#[cfg(not(target_os = "linux"))]
{
let fd = libc::accept(
self.listener.as_raw_fd(),
&mut addr as *mut _ as *mut libc::sockaddr,
&mut len,
);
if fd >= 0 {
if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
libc::close(fd);
return Poll::Ready(Err(io::Error::last_os_error()));
}
let flags = libc::fcntl(fd, libc::F_GETFL);
if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
libc::close(fd);
return Poll::Ready(Err(io::Error::last_os_error()));
}
}
fd
}
};
if fd < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
return Poll::Pending;
}
return Poll::Ready(Err(err));
}
let stream = match unsafe { TcpStream::from_raw_fd(fd) } {
Ok(s) => s,
Err(e) => return Poll::Ready(Err(e)),
};
let peer_addr = match self.listener.local_addr() {
Ok(_) => DUMMY_ADDR,
Err(_) => return Poll::Ready(Err(io::Error::last_os_error())),
};
Poll::Ready(Ok((stream, peer_addr)))
}
#[cfg(not(unix))]
{
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"TCP accept not yet implemented on this platform",
)))
}
}
}
pub struct UdpSocket {
fd: std::os::fd::OwnedFd,
}
impl UdpSocket {
pub fn bind(addr: &str) -> BindUdpFuture {
let Ok(addr) = addr.parse::<SocketAddr>() else {
return BindUdpFuture::Error(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid address format, use IP:PORT",
));
};
BindUdpFuture::Binding(BindingUdpState { addr })
}
pub fn recv_from<'a, 'b>(&'a mut self, buf: &'b mut [u8]) -> RecvFromFuture<'a, 'b> {
RecvFromFuture {
stream: Some(self),
buf,
}
}
pub fn send_to<'a, 'b>(&'a mut self, buf: &'b [u8], addr: SocketAddr) -> SendToFuture<'a, 'b> {
SendToFuture {
stream: Some(self),
buf,
addr,
}
}
pub fn connect(&mut self, addr: SocketAddr) -> ConnectUdpFuture {
ConnectUdpFuture {
fd: self.fd.as_raw_fd(),
addr,
done: false,
}
}
}
impl AsRawFd for UdpSocket {
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
pub enum BindUdpFuture {
Error(io::Error),
Binding(BindingUdpState),
Done,
}
struct BindingUdpState {
addr: SocketAddr,
}
impl Future for BindUdpFuture {
type Output = io::Result<UdpSocket>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
BindUdpFuture::Error(e) => {
let e = std::mem::replace(e, io::Error::other(""));
Poll::Ready(Err(e))
},
BindUdpFuture::Done => panic!("BindUdpFuture polled after completion"),
BindUdpFuture::Binding(state) => {
let fd = create_udp_socket(state.addr.is_ipv4());
if fd < 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
let result = do_bind_udp(fd, state.addr);
if result < 0 {
let err = io::Error::last_os_error();
unsafe { libc::close(fd) };
return Poll::Ready(Err(err));
}
let socket = UdpSocket {
fd: unsafe { std::os::fd::OwnedFd::from_raw_fd(fd) },
};
*self = BindUdpFuture::Done;
Poll::Ready(Ok(socket))
},
}
}
}
#[cfg(unix)]
fn create_udp_socket(ipv4: bool) -> RawFd {
unsafe {
let domain = if ipv4 { libc::AF_INET } else { libc::AF_INET6 };
#[cfg(target_os = "linux")]
let fd =
libc::socket(domain, libc::SOCK_DGRAM | libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, 0);
#[cfg(not(target_os = "linux"))]
let fd = libc::socket(domain, libc::SOCK_DGRAM, 0);
if fd < 0 {
return fd;
}
#[cfg(not(target_os = "linux"))]
{
if libc::fcntl(fd, libc::F_SETFD, libc::FD_CLOEXEC) < 0 {
libc::close(fd);
return -1;
}
let flags = libc::fcntl(fd, libc::F_GETFL);
if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
libc::close(fd);
return -1;
}
}
fd
}
}
#[cfg(unix)]
fn do_bind_udp(fd: RawFd, addr: SocketAddr) -> i32 {
unsafe {
if addr.is_ipv4() {
if let SocketAddr::V4(v4) = addr {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in {
sin_len: size_of::<libc::sockaddr_in>() as u8,
sin_family: libc::AF_INET as u8,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
libc::bind(
fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in>() as libc::socklen_t,
)
} else {
-1
}
} else {
if let SocketAddr::V6(v6) = addr {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in6 {
sin6_family: libc::AF_INET6 as u16,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in6 {
sin6_len: size_of::<libc::sockaddr_in6>() as u8,
sin6_family: libc::AF_INET6 as u8,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
libc::bind(
fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in6>() as libc::socklen_t,
)
} else {
-1
}
}
}
}
pub struct RecvFromFuture<'a, 'b> {
stream: Option<&'a mut UdpSocket>,
buf: &'b mut [u8],
}
impl Future for RecvFromFuture<'_, '_> {
type Output = io::Result<(usize, SocketAddr)>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream_fd;
let buf_ptr;
let buf_len;
{
let stream = self.stream.as_mut().unwrap();
stream_fd = stream.as_raw_fd();
buf_ptr = self.buf.as_mut_ptr();
buf_len = self.buf.len();
}
#[cfg(unix)]
{
let mut addr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
let mut addr_len = size_of::<libc::sockaddr_storage>() as libc::socklen_t;
let result = unsafe {
libc::recvfrom(
stream_fd,
buf_ptr as *mut _,
buf_len,
0,
&mut addr as *mut _ as *mut libc::sockaddr,
&mut addr_len,
)
};
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
return Poll::Pending;
}
return Poll::Ready(Err(err));
}
let n = result as usize;
let peer_addr = SocketAddr::V4(std::net::SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
Poll::Ready(Ok((n, peer_addr)))
}
#[cfg(not(unix))]
{
let _ = (stream_fd, buf_ptr, buf_len);
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"UDP recv_from not yet implemented on this platform",
)))
}
}
}
pub struct SendToFuture<'a, 'b> {
stream: Option<&'a mut UdpSocket>,
buf: &'b [u8],
addr: SocketAddr,
}
impl Future for SendToFuture<'_, '_> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream = self.stream.as_mut().unwrap();
let stream_fd = stream.as_raw_fd();
#[cfg(unix)]
{
let result = match self.addr {
SocketAddr::V4(v4) => {
let sockaddr = libc::sockaddr_in {
#[cfg(target_os = "macos")]
sin_len: size_of::<libc::sockaddr_in>() as u8,
sin_family: libc::AF_INET as _,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from(*v4.ip()).to_be(),
},
sin_zero: [0; 8],
};
unsafe {
libc::sendto(
stream_fd,
self.buf.as_ptr() as *const _,
self.buf.len(),
0,
&sockaddr as *const _ as *const _,
size_of::<libc::sockaddr_in>() as libc::socklen_t,
)
}
},
SocketAddr::V6(v6) => {
let sockaddr = libc::sockaddr_in6 {
sin6_family: libc::AF_INET6 as _,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo().to_be(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
#[cfg(target_os = "macos")]
sin6_len: size_of::<libc::sockaddr_in6>() as u8,
};
unsafe {
libc::sendto(
stream_fd,
self.buf.as_ptr() as *const _,
self.buf.len(),
0,
&sockaddr as *const _ as *const _,
size_of::<libc::sockaddr_in6>() as libc::socklen_t,
)
}
},
};
if result < 0 {
let err = io::Error::last_os_error();
if err.kind() == io::ErrorKind::WouldBlock {
return Poll::Pending;
}
return Poll::Ready(Err(err));
}
let n = result as usize;
Poll::Ready(Ok(n))
}
#[cfg(not(unix))]
{
let _ = stream_fd;
Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"UDP send_to not yet implemented on this platform",
)))
}
}
}
pub struct ConnectUdpFuture {
fd: RawFd,
addr: SocketAddr,
done: bool,
}
impl Future for ConnectUdpFuture {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(!self.done, "ConnectUdpFuture polled after completion");
#[cfg(unix)]
{
let result = unsafe {
match self.addr {
SocketAddr::V4(v4) => {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in {
sin_family: libc::AF_INET as u16,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in {
sin_len: size_of::<libc::sockaddr_in>() as u8,
sin_family: libc::AF_INET as u8,
sin_port: v4.port().to_be(),
sin_addr: libc::in_addr {
s_addr: u32::from_ne_bytes(v4.ip().octets()),
},
sin_zero: [0; 8],
};
libc::connect(
self.fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in>() as libc::socklen_t,
)
},
SocketAddr::V6(v6) => {
#[cfg(target_os = "linux")]
let sockaddr = libc::sockaddr_in6 {
sin6_family: libc::AF_INET6 as u16,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
#[cfg(not(target_os = "linux"))]
let sockaddr = libc::sockaddr_in6 {
sin6_len: size_of::<libc::sockaddr_in6>() as u8,
sin6_family: libc::AF_INET6 as u8,
sin6_port: v6.port().to_be(),
sin6_flowinfo: v6.flowinfo(),
sin6_addr: libc::in6_addr {
s6_addr: v6.ip().octets(),
},
sin6_scope_id: v6.scope_id(),
};
libc::connect(
self.fd,
&sockaddr as *const _ as *const libc::sockaddr,
size_of::<libc::sockaddr_in6>() as libc::socklen_t,
)
},
}
};
if result < 0 {
return Poll::Ready(Err(io::Error::last_os_error()));
}
}
#[cfg(not(unix))]
{
let _ = (self.fd, self.addr);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Unsupported,
"UDP connect not yet implemented on this platform",
)));
}
self.done = true;
Poll::Ready(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tcp_stream_create() {
let result = unsafe { TcpStream::from_raw_fd(-1) };
assert!(result.is_err());
}
#[test]
fn test_tcp_listener_bind_invalid() {
let future = TcpListener::bind("invalid_address");
match future {
BindFuture::Error(_) => {},
_ => panic!("Expected Error future"),
}
}
#[test]
fn test_connect_invalid_addr() {
let future = TcpStream::connect("not_an_address");
match future {
ConnectFuture::Error(_) => {},
_ => panic!("Expected Error future for invalid address"),
}
}
}