#![cfg(all(feature = "os-poll", feature = "tcp"))]
use mio::net::TcpStream;
use mio::{Interest, Token};
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::net::{self, Shutdown, SocketAddr};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::sync::{mpsc::channel, Arc, Barrier};
use std::thread;
#[macro_use]
mod util;
#[cfg(not(target_os = "windows"))]
use util::init;
use util::{
any_local_address, any_local_ipv6_address, assert_send, assert_socket_close_on_exec,
assert_socket_non_blocking, assert_sync, assert_would_block, expect_events, expect_no_events,
init_with_poll, ExpectEvent, Readiness,
};
const DATA1: &[u8] = b"Hello world!";
const DATA2: &[u8] = b"Hello mars!";
const DATA1_LEN: usize = 12;
const DATA2_LEN: usize = 11;
const ID1: Token = Token(0);
const ID2: Token = Token(1);
#[test]
fn is_send_and_sync() {
assert_send::<TcpStream>();
assert_sync::<TcpStream>();
}
#[test]
fn tcp_stream_ipv4() {
smoke_test_tcp_stream(any_local_address(), TcpStream::connect);
}
#[test]
fn tcp_stream_ipv6() {
smoke_test_tcp_stream(any_local_ipv6_address(), TcpStream::connect);
}
#[test]
fn tcp_stream_std() {
smoke_test_tcp_stream(any_local_address(), |addr| {
let stream = net::TcpStream::connect(addr).unwrap();
stream.set_nonblocking(true).unwrap();
Ok(TcpStream::from_std(stream))
});
}
fn smoke_test_tcp_stream<F>(addr: SocketAddr, make_stream: F)
where
F: FnOnce(SocketAddr) -> io::Result<TcpStream>,
{
let (mut poll, mut events) = init_with_poll();
let (handle, addr) = echo_listener(addr, 1);
let mut stream = make_stream(addr).unwrap();
assert_socket_non_blocking(&stream);
assert_socket_close_on_exec(&stream);
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
let mut buf = [0; 16];
assert_would_block(stream.peek(&mut buf));
assert_would_block(stream.read(&mut buf));
assert_eq!(stream.peer_addr().unwrap(), addr);
assert!(stream.local_addr().unwrap().ip().is_loopback());
checked_write!(stream.write(&DATA1));
stream.flush().unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
expect_read!(stream.peek(&mut buf), DATA1);
expect_read!(stream.read(&mut buf), DATA1);
assert!(stream.take_error().unwrap().is_none());
assert_would_block(stream.read(&mut buf));
let bufs = [IoSlice::new(&DATA1), IoSlice::new(&DATA2)];
let n = stream
.write_vectored(&bufs)
.expect("unable to write vectored to stream");
assert_eq!(n, DATA1.len() + DATA2.len());
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
let mut buf1 = [1; DATA1_LEN];
let mut buf2 = [2; DATA2_LEN + 1];
let mut bufs = [IoSliceMut::new(&mut buf1), IoSliceMut::new(&mut buf2)];
let n = stream
.read_vectored(&mut bufs)
.expect("unable to read vectored from stream");
assert_eq!(n, DATA1.len() + DATA2.len());
assert_eq!(&buf1, DATA1);
assert_eq!(&buf2[..DATA2.len()], DATA2);
assert_eq!(buf2[DATA2.len()], 2);
drop(stream);
handle.join().expect("unable to join thread");
}
#[test]
fn set_get_ttl() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
const TTL: u32 = 10;
stream.set_ttl(TTL).unwrap();
assert_eq!(stream.ttl().unwrap(), TTL);
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn get_ttl_without_previous_set() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.ttl().expect("unable to get TTL for TCP stream");
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn set_get_nodelay() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
const NO_DELAY: bool = true;
stream.set_nodelay(NO_DELAY).unwrap();
assert_eq!(stream.nodelay().unwrap(), NO_DELAY);
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn get_nodelay_without_previous_set() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (thread_handle, address) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE)
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream
.nodelay()
.expect("Unable to get nodelay for TCP stream");
assert!(stream.take_error().unwrap().is_none());
barrier.wait();
thread_handle.join().expect("unable to join thread");
}
#[test]
fn shutdown_read() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(&DATA2));
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
stream.shutdown(Shutdown::Read).unwrap();
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
{
let mut buf = [0; 20];
expect_read!(stream.read(&mut buf), &[]);
}
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
#[ignore = "This test is flaky, it doesn't always receive an event after shutting down the write side"]
fn shutdown_write() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(&DATA1));
stream.shutdown(Shutdown::Write).unwrap();
let err = stream.write(DATA2).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
let mut buf = [0; 20];
expect_read!(stream.read(&mut buf), DATA1);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn shutdown_both() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
checked_write!(stream.write(&DATA1));
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::READABLE)],
);
stream.shutdown(Shutdown::Both).unwrap();
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
))]
{
let mut buf = [0; 20];
expect_read!(stream.read(&mut buf), &[]);
}
let err = stream.write(DATA2).unwrap_err();
#[cfg(unix)]
assert_eq!(err.kind(), io::ErrorKind::BrokenPipe);
#[cfg(windows)]
assert_eq!(err.kind(), io::ErrorKind::ConnectionAborted);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[cfg(unix)]
#[test]
fn raw_fd() {
init();
let (thread_handle, address) = start_listener(1, None, false);
let stream = TcpStream::connect(address).unwrap();
let address = stream.local_addr().unwrap();
let raw_fd1 = stream.as_raw_fd();
let raw_fd2 = stream.into_raw_fd();
assert_eq!(raw_fd1, raw_fd2);
let stream = unsafe { TcpStream::from_raw_fd(raw_fd2) };
assert_eq!(stream.as_raw_fd(), raw_fd1);
assert_eq!(stream.local_addr().unwrap(), address);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn registering() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE)
.expect("unable to register TCP stream");
expect_no_events(&mut poll, &mut events);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn reregistering() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE)
.expect("unable to register TCP stream");
poll.registry()
.reregister(&mut stream, ID2, Interest::WRITABLE)
.expect("unable to reregister TCP stream");
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID2, Interest::WRITABLE)],
);
assert_eq!(stream.peer_addr().unwrap(), address);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
fn no_events_after_deregister() {
let (mut poll, mut events) = init_with_poll();
let (thread_handle, address) = echo_listener(any_local_address(), 1);
let mut stream = TcpStream::connect(address).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::WRITABLE.add(Interest::READABLE))
.expect("unable to register TCP stream");
poll.registry()
.deregister(&mut stream)
.expect("unable to deregister TCP stream");
expect_no_events(&mut poll, &mut events);
assert_eq!(stream.peer_addr().unwrap(), address);
let mut buf = [0; 16];
assert_would_block(stream.peek(&mut buf));
assert_would_block(stream.read(&mut buf));
checked_write!(stream.write(&DATA1));
stream.flush().unwrap();
expect_no_events(&mut poll, &mut events);
drop(stream);
thread_handle.join().expect("unable to join thread");
}
#[test]
#[cfg_attr(
windows,
ignore = "fails on Windows; client read closed events are not triggered"
)]
fn tcp_shutdown_client_read_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(sockaddr).unwrap();
let interests = Interest::READABLE | Interest::WRITABLE;
poll.registry()
.register(&mut stream, ID1, interests)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.shutdown(Shutdown::Read).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
#[test]
#[cfg_attr(windows, ignore = "fails; client write_closed events are not found")]
#[cfg_attr(
any(target_os = "linux", target_os = "android", target_os = "solaris"),
ignore = "fails; client write_closed events are not found"
)]
fn tcp_shutdown_client_write_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(sockaddr).unwrap();
let interests = Interest::READABLE | Interest::WRITABLE;
poll.registry()
.register(&mut stream, ID1, interests)
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.shutdown(Shutdown::Write).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::WRITE_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
#[test]
fn tcp_shutdown_server_write_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), true);
let mut stream = TcpStream::connect(sockaddr).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE))
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
barrier.wait();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::READ_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
#[test]
#[cfg_attr(
windows,
ignore = "fails on Windows; client close events are not found"
)]
fn tcp_shutdown_client_both_close_event() {
let (mut poll, mut events) = init_with_poll();
let barrier = Arc::new(Barrier::new(2));
let (handle, sockaddr) = start_listener(1, Some(barrier.clone()), false);
let mut stream = TcpStream::connect(sockaddr).unwrap();
poll.registry()
.register(&mut stream, ID1, Interest::READABLE.add(Interest::WRITABLE))
.unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Interest::WRITABLE)],
);
stream.shutdown(Shutdown::Both).unwrap();
expect_events(
&mut poll,
&mut events,
vec![ExpectEvent::new(ID1, Readiness::WRITE_CLOSED)],
);
barrier.wait();
handle.join().expect("failed to join thread");
}
fn echo_listener(addr: SocketAddr, n_connections: usize) -> (thread::JoinHandle<()>, SocketAddr) {
let (sender, receiver) = channel();
let thread_handle = thread::spawn(move || {
let listener = net::TcpListener::bind(addr).unwrap();
let local_address = listener.local_addr().unwrap();
sender.send(local_address).unwrap();
let mut buf = [0; 128];
for _ in 0..n_connections {
let (mut stream, _) = listener.accept().unwrap();
loop {
let n = stream
.read(&mut buf)
.or_else(|err| match err {
ref err if err.kind() == io::ErrorKind::ConnectionReset => Ok(0),
err => Err(err),
})
.expect("error reading");
if n == 0 {
break;
}
checked_write!(stream.write(&buf[..n]));
}
}
});
(thread_handle, receiver.recv().unwrap())
}
fn start_listener(
n_connections: usize,
barrier: Option<Arc<Barrier>>,
shutdown_write: bool,
) -> (thread::JoinHandle<()>, SocketAddr) {
let (sender, receiver) = channel();
let thread_handle = thread::spawn(move || {
let listener = net::TcpListener::bind(any_local_address()).unwrap();
let local_address = listener.local_addr().unwrap();
sender.send(local_address).unwrap();
for _ in 0..n_connections {
let (stream, _) = listener.accept().unwrap();
if let Some(ref barrier) = barrier {
barrier.wait();
if shutdown_write {
stream.shutdown(Shutdown::Write).unwrap();
barrier.wait();
}
}
drop(stream);
}
});
(thread_handle, receiver.recv().unwrap())
}