#![deny(unsafe_code)]
extern crate lazy_static;
use lazy_static::lazy_static;
use std::io::{self, Error, ErrorKind};
use std::mem;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, TcpListener, TcpStream, ToSocketAddrs};
use std::process;
use std::sync::{Arc, Condvar, Mutex, Once};
use std::thread::Builder;
static SPAWN_SERVER: Once = Once::new();
lazy_static! {
static ref STREAM: Arc<(Mutex<Option<(TcpStream, TcpStream)>>, Condvar)> =
Arc::new((Mutex::new(None), Condvar::new()));
static ref DEFAULT_ADDRESS: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 31398));
}
fn listen_on(address: impl ToSocketAddrs) {
SPAWN_SERVER.call_once(move || {
let address = address
.to_socket_addrs()
.expect("<impl ToSocketAddrs>::to_socket_addrs() at listen_on()")
.next()
.expect("ToSocketAddrs::Iter::next() at listen_on()");
let listener = TcpListener::bind(address).expect("TcpListener::bind() at listen_on()");
let buf = Arc::new((Mutex::new(None), Condvar::new()));
let buf2 = buf.clone();
Builder::new()
.name(String::from("tcp-test listener thread"))
.spawn(move || {
let (ref lock, ref cvar) = &*buf2;
listener_thread(listener, lock, cvar).map_err(|e| {
if cfg!(feature = "only_panic") {
panic!("tcp-test internal error: {}", e);
} else if cfg!(not(feature = "only_panic")) {
eprintln!(
"tcp-test internal error: {error}, {file}:{line}:{column}",
error = e,
file = file!(),
line = line!(),
column = column!()
);
process::exit(1);
};
})
})
.expect("Builder::spawn() at listen_on()");
Builder::new()
.name(String::from("tcp-test channel thread"))
.spawn(move || {
let &(ref lock, ref cvar) = &*buf;
channel_thread(address, lock, cvar).map_err(|e| {
if cfg!(feature = "only_panic") {
panic!("tcp-test internal error: {}", e);
} else if cfg!(not(feature = "only_panic")) {
eprintln!(
"tcp-test internal error: {error}, {file}:{line}:{column}",
error = e,
file = file!(),
line = line!(),
column = column!()
);
process::exit(1);
};
})
})
.expect("Builder::spawn() at listen_on()");
});
}
fn listener_thread(
listener: TcpListener,
lock: &Mutex<Option<TcpStream>>,
cvar: &Condvar,
) -> io::Result<()> {
let error = |message| Error::new(ErrorKind::Other, message);
for i in listener.incoming() {
let i = i?;
let mut buf = lock
.lock()
.map_err(|_| error(concat!("failed to lock Mutex, ", line!())))?;
while buf.is_some() {
buf = cvar
.wait(buf)
.map_err(|_| error(concat!("failed to wait for Condvar, ", line!())))?;
}
*buf = Some(i);
cvar.notify_one();
}
Ok(())
}
fn channel_thread(
address: SocketAddr,
lock: &Mutex<Option<TcpStream>>,
cvar: &Condvar,
) -> Result<(), Error> {
let error = |message| Error::new(ErrorKind::Other, message);
loop {
let local = TcpStream::connect(address)?;
let remote = {
let mut remote = lock
.lock()
.map_err(|_| error(concat!("failed to lock Mutex, ", line!())))?;
while remote.is_none() {
remote = cvar
.wait(remote)
.map_err(|_| error(concat!("failed to wait for Condvar, ", line!())))?;
}
mem::replace(&mut *remote, None).unwrap()
};
let &(ref lock, ref cvar) = &*STREAM.clone();
let mut stream = lock
.lock()
.map_err(|_| error(concat!("failed to lock Mutex, ", line!())))?;
while stream.is_some() {
stream = cvar
.wait(stream)
.map_err(|_| error(concat!("failed to wait for Condvar, ", line!())))?;
}
*stream = Some((local, remote));
cvar.notify_one();
}
}
#[inline]
pub fn channel() -> (TcpStream, TcpStream) {
channel_on(*DEFAULT_ADDRESS)
}
pub fn channel_on(address: impl ToSocketAddrs) -> (TcpStream, TcpStream) {
listen_on(address);
let &(ref lock, ref cvar) = &*STREAM.clone();
let mut buf = lock.lock().unwrap();
while buf.is_none() {
buf = cvar.wait(buf).unwrap();
}
let channel = mem::replace(&mut *buf, None);
cvar.notify_all();
channel.unwrap()
}
#[macro_export]
macro_rules! read_assert {
($resource:expr, $n:expr, $expected:expr) => {{
match &$expected {
expected => {
use std::io::Read;
let mut buf = [0; $n];
$resource
.read_exact(&mut buf)
.expect("failed to read in read_assert!");
assert_eq!(
&buf[..],
&expected[..],
"read_assert! buffers are not equal"
);
}
}
}};
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{self, Read};
struct Placeholder;
impl Read for Placeholder {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
Ok(buf.len())
}
}
#[test]
fn read_assert_ok() {
read_assert!(Placeholder {}, 9, [0; 9]);
}
#[test]
#[should_panic]
fn read_assert_panic() {
read_assert!(Placeholder {}, 1, [0xff]);
}
}