use super::stream::GlommioStream;
use crate::{
net::{
stream::{Buffered, NonBuffered, Preallocated, RxBuf},
yolo_accept,
},
reactor::Reactor,
GlommioError,
};
use futures_lite::{
future::poll_fn,
io::{AsyncBufRead, AsyncRead, AsyncWrite},
stream::{self, Stream},
};
use nix::sys::socket::{InetAddr, SockAddr};
use pin_project_lite::pin_project;
use socket2::{Domain, Protocol, Socket, Type};
use std::{
io,
net::{self, Shutdown, SocketAddr, ToSocketAddrs},
os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd},
pin::Pin,
rc::{Rc, Weak},
task::{Context, Poll},
time::Duration,
};
type Result<T> = crate::Result<T, ()>;
#[derive(Debug)]
pub struct TcpListener {
reactor: Weak<Reactor>,
listener: net::TcpListener,
}
impl FromRawFd for TcpListener {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
let sk = Socket::from_raw_fd(fd);
let listener = sk.into_tcp_listener();
TcpListener {
reactor: Rc::downgrade(&crate::executor().reactor()),
listener,
}
}
}
impl TcpListener {
pub fn bind<A: ToSocketAddrs>(addr: A) -> Result<TcpListener> {
let addr = addr
.to_socket_addrs()
.unwrap()
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "empty address"))?;
let domain = if addr.is_ipv6() {
Domain::ipv6()
} else {
Domain::ipv4()
};
let sk = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
let addr = socket2::SockAddr::from(addr);
sk.set_reuse_port(true)?;
sk.bind(&addr)?;
sk.listen(1024)?;
let listener = sk.into_tcp_listener();
Ok(TcpListener {
reactor: Rc::downgrade(&crate::executor().reactor()),
listener,
})
}
pub async fn shared_accept(&self) -> Result<AcceptedTcpStream> {
let reactor = self.reactor.upgrade().unwrap();
let raw_fd = self.listener.as_raw_fd();
if let Some(r) = yolo_accept(raw_fd) {
match r {
Ok(fd) => {
return Ok(AcceptedTcpStream { fd });
}
Err(err) => return Err(GlommioError::IoError(err)),
}
}
let source = reactor.accept(self.listener.as_raw_fd());
let fd = source.collect_rw().await?;
Ok(AcceptedTcpStream { fd: fd as RawFd })
}
pub async fn accept(&self) -> Result<TcpStream> {
let a = self.shared_accept().await?;
Ok(a.bind_to_executor())
}
pub fn incoming(&self) -> impl Stream<Item = Result<TcpStream>> + Unpin + '_ {
Box::pin(stream::unfold(self, |listener| async move {
let res = listener.accept().await;
Some((res, listener))
}))
}
pub fn local_addr(&self) -> Result<SocketAddr> {
Ok(self.listener.local_addr()?)
}
pub fn ttl(&self) -> Result<u32> {
Ok(self.listener.ttl()?)
}
pub fn set_ttl(&self, ttl: u32) -> Result<()> {
Ok(self.listener.set_ttl(ttl)?)
}
}
#[derive(Copy, Clone, Debug)]
pub struct AcceptedTcpStream {
fd: RawFd,
}
impl AcceptedTcpStream {
pub fn peer_addr(&self) -> Result<SocketAddr> {
let socket = unsafe { Socket::from_raw_fd(self.fd) };
let sock_addr = socket.peer_addr()?;
socket.into_raw_fd();
Ok(sock_addr.as_std().unwrap())
}
pub fn bind_to_executor(self) -> TcpStream {
TcpStream {
stream: unsafe { GlommioStream::from_raw_fd(self.fd) },
}
}
}
pin_project! {
#[derive(Debug)]
pub struct TcpStream<B: RxBuf = NonBuffered> {
stream: GlommioStream<net::TcpStream, B>
}
}
impl From<socket2::Socket> for TcpStream {
fn from(socket: socket2::Socket) -> TcpStream {
Self {
stream: GlommioStream::from(socket),
}
}
}
impl<B: RxBuf> AsRawFd for TcpStream<B> {
fn as_raw_fd(&self) -> RawFd {
self.stream.stream().as_raw_fd()
}
}
impl FromRawFd for TcpStream {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
let socket = socket2::Socket::from_raw_fd(fd);
TcpStream::from(socket)
}
}
fn make_tcp_socket(addr: &SocketAddr) -> io::Result<(SockAddr, Socket)> {
let domain = if addr.is_ipv6() {
Domain::ipv6()
} else {
Domain::ipv4()
};
let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
let inet = InetAddr::from_std(addr);
let addr = SockAddr::new_inet(inet);
Ok((addr, socket))
}
impl TcpStream {
pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<TcpStream> {
let addr = addr.to_socket_addrs()?.next().unwrap();
let (addr, socket) = make_tcp_socket(&addr)?;
let reactor = crate::executor().reactor();
let source = reactor.connect(socket.as_raw_fd(), addr);
source.collect_rw().await?;
Ok(TcpStream {
stream: GlommioStream::from(socket),
})
}
pub async fn connect_timeout<A: ToSocketAddrs>(
addr: A,
duration: Duration,
) -> Result<TcpStream> {
if duration.as_secs() == 0 && duration.subsec_nanos() == 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"cannot set a 0 duration timeout",
)
.into());
}
let addr = addr.to_socket_addrs()?.next().unwrap();
let (addr, socket) = make_tcp_socket(&addr)?;
let reactor = crate::executor().reactor();
let source = reactor.connect_timeout(socket.as_raw_fd(), addr, duration);
source
.collect_rw()
.await
.map_err(|err| match err.raw_os_error() {
Some(libc::ECANCELED) => {
io::Error::new(io::ErrorKind::TimedOut, "connection timed out")
}
_ => err,
})?;
Ok(TcpStream {
stream: GlommioStream::from(socket),
})
}
pub fn buffered(self) -> TcpStream<Preallocated> {
self.buffered_with(Preallocated::default())
}
pub fn buffered_with<B: Buffered>(self, buf: B) -> TcpStream<B> {
TcpStream {
stream: self.stream.buffered_with(buf),
}
}
}
impl<B: RxBuf> TcpStream<B> {
pub fn set_read_timeout(&self, dur: Option<Duration>) -> Result<()> {
self.stream.set_read_timeout(dur)
}
pub fn set_write_timeout(&self, dur: Option<Duration>) -> Result<()> {
self.stream.set_write_timeout(dur)
}
pub fn read_timeout(&self) -> Option<Duration> {
self.stream.read_timeout()
}
pub fn write_timeout(&self) -> Option<Duration> {
self.stream.write_timeout()
}
pub async fn shutdown(&self, how: Shutdown) -> Result<()> {
poll_fn(|cx| self.stream.poll_shutdown(cx, how))
.await
.map_err(Into::into)
}
pub fn set_nodelay(&self, value: bool) -> Result<()> {
self.stream.stream().set_nodelay(value).map_err(Into::into)
}
pub fn nodelay(&self) -> Result<bool> {
self.stream.stream().nodelay().map_err(Into::into)
}
pub fn ttl(&self) -> Result<u32> {
Ok(self.stream.stream().ttl()?)
}
pub fn set_ttl(&self, ttl: u32) -> Result<()> {
Ok(self.stream.stream().set_ttl(ttl)?)
}
pub async fn peek(&self, buf: &mut [u8]) -> Result<usize> {
self.stream.peek(buf).await.map_err(Into::into)
}
pub fn peer_addr(&self) -> Result<SocketAddr> {
self.stream.stream().peer_addr().map_err(Into::into)
}
pub fn local_addr(&self) -> Result<SocketAddr> {
self.stream.stream().local_addr().map_err(Into::into)
}
}
impl<B: Buffered + Unpin> AsyncBufRead for TcpStream<B> {
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
let this = self.project();
this.stream.poll_fill_buf(cx)
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.stream.consume(amt);
}
}
impl<B: RxBuf + Unpin> AsyncRead for TcpStream<B> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stream).poll_read(cx, buf)
}
}
impl<B: RxBuf + Unpin> AsyncWrite for TcpStream<B> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_close(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{channels::shared_channel, enclose, timer::Timer, LocalExecutorBuilder};
use futures_lite::{
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt},
StreamExt,
};
use std::{
cell::Cell,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
#[test]
fn tcp_listener_ttl() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
listener.set_ttl(100).unwrap();
assert_eq!(listener.ttl().unwrap(), 100);
});
}
#[test]
fn tcp_stream_ttl() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let stream = TcpStream::connect(addr).await.unwrap();
stream.set_ttl(100).unwrap();
assert_eq!(stream.ttl().unwrap(), 100);
});
}
#[test]
fn tcp_stream_nodelay() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let stream = TcpStream::connect(addr).await.unwrap();
stream.set_nodelay(true).expect("set_nodelay call failed");
assert!(stream.nodelay().unwrap());
});
}
#[test]
fn connect_local_server() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let coord = Rc::new(Cell::new(0));
let listener_handle = crate::spawn_local(enclose! { (coord) async move {
coord.set(1);
let stream = listener.accept().await?;
stream.peer_addr()
}});
while coord.get() != 1 {
crate::executor().yield_task_queue_now().await;
}
let stream = TcpStream::connect(addr).await.unwrap();
assert_eq!(listener_handle.await.unwrap(), stream.local_addr().unwrap());
});
}
#[test]
fn multi_executor_bind_works() {
test_executor!(async move {
let addr_getter = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = addr_getter.local_addr().unwrap();
let (first_sender, first_receiver) = shared_channel::new_bounded(1);
let (second_sender, second_receiver) = shared_channel::new_bounded(1);
let ex1 = LocalExecutorBuilder::default()
.spawn(move || async move {
let receiver = first_receiver.connect().await;
let _ = TcpListener::bind(addr).unwrap();
receiver.recv().await.unwrap();
})
.unwrap();
let ex2 = LocalExecutorBuilder::default()
.spawn(move || async move {
let receiver = second_receiver.connect().await;
let _ = TcpListener::bind(addr).unwrap();
receiver.recv().await.unwrap();
})
.unwrap();
Timer::new(Duration::from_millis(100)).await;
let sender = first_sender.connect().await;
sender.try_send(0).unwrap();
let sender = second_sender.connect().await;
sender.try_send(0).unwrap();
ex1.join().unwrap();
ex2.join().unwrap();
});
}
#[test]
fn multi_executor_accept() {
let (sender, receiver) = shared_channel::new_bounded(1);
let (addr_sender, addr_receiver) = shared_channel::new_bounded(1);
let connected = Arc::new(AtomicUsize::new(0));
let status = connected.clone();
let ex1 = LocalExecutorBuilder::default()
.spawn(move || async move {
let sender = sender.connect().await;
let addr_sender = addr_sender.connect().await;
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
addr_sender.try_send(addr).unwrap();
status.store(1, Ordering::Relaxed);
let accepted = listener.shared_accept().await.unwrap();
sender.try_send(accepted).unwrap();
})
.unwrap();
let status = connected.clone();
let ex2 = LocalExecutorBuilder::default()
.spawn(move || async move {
let receiver = receiver.connect().await;
let accepted = receiver.recv().await.unwrap();
let _ = accepted.bind_to_executor();
status.store(2, Ordering::Relaxed);
})
.unwrap();
let ex3 = LocalExecutorBuilder::default()
.spawn(move || async move {
let receiver = addr_receiver.connect().await;
let addr = receiver.recv().await.unwrap();
TcpStream::connect(addr).await.unwrap();
})
.unwrap();
ex1.join().unwrap();
ex2.join().unwrap();
ex3.join().unwrap();
assert_eq!(connected.load(Ordering::Relaxed), 2);
}
#[test]
fn stream_of_connections() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let coord = Rc::new(Cell::new(0));
let listener_handle = crate::spawn_local(enclose! { (coord) async move {
coord.set(1);
listener.incoming().take(4).try_for_each(|addr| {
addr.map(|_| ())
}).await
}});
while coord.get() != 1 {
crate::executor().yield_task_queue_now().await;
}
let mut handles = Vec::with_capacity(4);
for _ in 0..4 {
handles.push(
crate::spawn_local(async move { TcpStream::connect(addr).await }).detach(),
);
}
for handle in handles.drain(..) {
handle.await.unwrap().unwrap();
}
listener_handle.await.unwrap();
let res = TcpStream::connect(addr).await;
assert!(res.is_err())
});
}
#[test]
fn parallel_accept() {
test_executor!(async move {
let listener = Rc::new(TcpListener::bind("127.0.0.1:0").unwrap());
let addr = listener.local_addr().unwrap();
let mut handles = Vec::new();
for _ in 0..128 {
handles.push(
crate::spawn_local(enclose! { (listener) async move {
let _accept = listener.accept().await.unwrap();
}})
.detach(),
);
}
Timer::new(Duration::from_millis(100)).await;
for _ in 0..128 {
handles.push(
crate::spawn_local(async move {
let _stream = TcpStream::connect(addr).await.unwrap();
})
.detach(),
);
}
for handle in handles {
handle.await.unwrap();
}
});
}
#[test]
fn connect_and_ping_pong() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let coord = Rc::new(Cell::new(0));
let listener_handle = crate::spawn_local(enclose! { (coord) async move {
coord.set(1);
let mut stream = listener.accept().await?;
let mut byte = [0u8; 1];
let read = stream.read(&mut byte).await?;
assert_eq!(read, 1);
io::Result::Ok(byte[0])
}})
.detach();
while coord.get() != 1 {
crate::executor().yield_task_queue_now().await;
}
let mut stream = TcpStream::connect(addr).await.unwrap();
let byte = [65u8; 1];
let b = stream.write(&byte).await.unwrap();
assert_eq!(b, 1);
assert_eq!(listener_handle.await.unwrap().unwrap(), 65u8);
});
}
#[test]
fn test_read_until() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let mut stream = listener.accept().await?.buffered();
let mut buf = Vec::new();
stream.read_until(10, &mut buf).await?;
io::Result::Ok(buf.len())
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
let vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let b = stream.write(&vec).await.unwrap();
assert_eq!(b, 10);
assert_eq!(listener_handle.await.unwrap().unwrap(), 10);
});
}
#[test]
fn test_read_line() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let mut stream = listener.accept().await?.buffered();
let mut buf = String::new();
stream.read_line(&mut buf).await?;
io::Result::Ok(buf.len())
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
let b = stream.write(b"line\n").await.unwrap();
assert_eq!(b, 5);
assert_eq!(listener_handle.await.unwrap().unwrap(), 5);
});
}
#[test]
fn test_lines() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let stream = listener.accept().await?.buffered();
io::Result::Ok(stream.lines().count().await)
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write(b"line1\nline2\nline3\n").await.unwrap();
stream.write(b"line4\nline5\nline6\n").await.unwrap();
stream.close().await.unwrap();
assert_eq!(listener_handle.await.unwrap().unwrap(), 6);
});
}
#[test]
fn multibuf_fill() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let mut stream = listener.accept().await?.buffered();
let buf = stream.fill_buf().await?;
assert_eq!(&buf[0..4], b"msg1");
stream.consume(4);
let buf = stream.fill_buf().await?;
assert_eq!(buf, b"msg2");
stream.consume(4);
let buf = stream.fill_buf().await?;
assert_eq!(buf.len(), 0);
io::Result::Ok(())
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
let b = stream.write(b"msg1").await.unwrap();
assert_eq!(b, 4);
stream.write(b"msg2").await.unwrap();
assert_eq!(b, 4);
stream.close().await.unwrap();
listener_handle.await.unwrap().unwrap();
});
}
#[test]
fn overconsume() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let mut stream = listener.accept().await?.buffered();
let buf = stream.fill_buf().await?;
assert_eq!(buf.len(), 4);
stream.consume(100);
let buf = stream.fill_buf().await?;
assert_eq!(buf.len(), 0);
io::Result::Ok(())
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write(b"msg1").await.unwrap();
stream.close().await.unwrap();
listener_handle.await.unwrap().unwrap();
});
}
#[test]
fn repeated_fill_before_consume() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let mut stream = listener.accept().await?.buffered();
let buf = stream.fill_buf().await?;
assert_eq!(buf, b"msg1");
let buf = stream.fill_buf().await?;
assert_eq!(buf, b"msg1");
stream.consume(4);
let buf = stream.fill_buf().await?;
assert!(buf.is_empty());
io::Result::Ok(())
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write(b"msg1").await.unwrap();
stream.close().await.unwrap();
listener_handle.await.unwrap().unwrap();
});
}
#[test]
fn peek() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let listener_handle = crate::spawn_local(async move {
let mut stream = listener.accept().await?;
let mut buf = [0u8; 64];
for _ in 0..10 {
let b = stream.peek(&mut buf).await?;
assert_eq!(b, 4);
assert_eq!(&buf[0..4], b"msg1");
}
stream.read(&mut buf).await?;
stream.peek(&mut buf).await
})
.detach();
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write(b"msg1").await.unwrap();
stream.close().await.unwrap();
let res = listener_handle.await.unwrap().unwrap();
assert_eq!(res, 0);
});
}
#[test]
fn tcp_connect_timeout() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
match TcpStream::connect_timeout(addr, Duration::from_millis(250)).await {
Ok(_) => {}
Err(e) => panic!("unexpected error {}", e),
}
});
}
#[test]
fn tcp_connect_timeout_error() {
test_executor!(async move {
match TcpStream::connect_timeout("10.255.255.1:80", Duration::from_millis(250)).await {
Ok(_) => panic!("unexpected success"),
Err(GlommioError::IoError(ref e)) if e.kind() == io::ErrorKind::TimedOut => {}
Err(e) => panic!("unexpected error {}", e),
}
});
}
#[test]
fn tcp_read_timeout() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let ltask = crate::spawn_local(async move {
let mut stream = listener.accept().await?;
stream
.set_read_timeout(Some(Duration::from_secs(1)))
.unwrap();
let mut buf = [0u8; 64];
let now = Instant::now();
match stream.read(&mut buf).await {
Ok(_) => unreachable!(),
Err(x) => {
assert_eq!(x.kind(), io::ErrorKind::TimedOut);
}
};
assert!(now.elapsed().as_secs() >= 1);
io::Result::Ok(0)
});
let _s = TcpStream::connect(addr).await.unwrap();
ltask.await.unwrap();
});
}
#[test]
fn tcp_force_poll() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let ltask = crate::spawn_local(async move {
let mut stream = listener.accept().await?;
poll_fn(|cx| {
let mut buf = [0u8; 64];
for _ in 0..64_000 {
if Pin::new(&mut stream).poll_read(cx, &mut buf).is_ready() {
panic!("should be pending");
}
}
Poll::Ready(())
})
.await;
io::Result::Ok(0)
});
let _s = TcpStream::connect(addr).await.unwrap();
ltask.await.unwrap();
});
}
#[test]
fn tcp_invalid_timeout() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let ltask = crate::spawn_local(async move {
let stream = listener.accept().await?;
stream
.set_write_timeout(Some(Duration::from_nanos(0)))
.unwrap_err();
assert!(stream.write_timeout().is_none());
stream
.set_write_timeout(Some(Duration::from_secs(1)))
.unwrap();
assert_eq!(stream.write_timeout(), Some(Duration::from_secs(1)));
io::Result::Ok(0)
});
let _s = TcpStream::connect(addr).await.unwrap();
ltask.await.unwrap();
});
}
#[test]
fn accepted_tcp_stream_peer_addr() {
test_executor!(async move {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let peer_addr = crate::spawn_local(async move {
let accepted = listener.shared_accept().await.unwrap();
let peer_addr = accepted.peer_addr().unwrap();
let stream = accepted.bind_to_executor();
assert_eq!(peer_addr, stream.peer_addr().unwrap());
peer_addr
});
let s = TcpStream::connect(addr).await.unwrap();
assert_eq!(s.local_addr().unwrap(), peer_addr.await);
});
}
}