use std::io::{BufRead, BufReader, Read, Write};
use std::sync::{Arc, Mutex};
use go_lib::{
chan::chan,
go,
net::{TcpListener, TcpStream},
sync::WaitGroup,
};
static NET_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn net_listener_local_addr() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed");
let addr = listener.local_addr().expect("local_addr failed");
assert_eq!(addr.ip().to_string(), "127.0.0.1");
assert_ne!(addr.port(), 0, "OS must assign a non-zero port");
});
}
#[test]
fn net_read_write_mut_ref() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let (done_tx, done_rx) = chan::<()>(1);
go!(move || {
let mut conn = listener.accept().unwrap();
let mut buf = [0u8; 64];
let n = conn.read(&mut buf).unwrap();
conn.write_all(&buf[..n]).unwrap();
done_tx.send(());
});
let mut client = TcpStream::connect(addr).unwrap();
client.write_all(b"hello").unwrap();
let mut resp = [0u8; 5];
client.read_exact(&mut resp).unwrap();
assert_eq!(&resp, b"hello");
done_rx.recv();
});
}
#[test]
fn net_read_write_shared_ref() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let (done_tx, done_rx) = chan::<()>(1);
go!(move || {
let conn = listener.accept().unwrap();
let mut buf = [0u8; 64];
let n = (&conn).read(&mut buf).unwrap();
(&conn).write_all(&buf[..n]).unwrap();
done_tx.send(());
});
let client = TcpStream::connect(addr).unwrap();
(&client).write_all(b"shared").unwrap();
let mut resp = [0u8; 6];
(&client).read_exact(&mut resp).unwrap();
assert_eq!(&resp, b"shared");
done_rx.recv();
});
}
#[test]
fn net_try_clone_split_halves() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let (done_tx, done_rx) = chan::<()>(1);
go!(move || {
let stream = listener.accept().unwrap();
let mut writer = stream.try_clone().expect("try_clone failed");
let mut buf = [0u8; 64];
let n = (&stream).read(&mut buf).unwrap();
writer.write_all(&buf[..n]).unwrap();
done_tx.send(());
});
let mut client = TcpStream::connect(addr).unwrap();
client.write_all(b"cloned").unwrap();
let mut resp = [0u8; 6];
client.read_exact(&mut resp).unwrap();
assert_eq!(&resp, b"cloned");
done_rx.recv();
});
}
#[test]
fn net_try_clone_separate_goroutines() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let (done_tx, done_rx) = chan::<()>(1);
go!(move || {
let stream = listener.accept().unwrap();
let writer = stream.try_clone().expect("try_clone failed");
let (relay_tx, relay_rx) = chan::<Vec<u8>>(1);
go!(move || {
let mut buf = [0u8; 64];
let n = (&stream).read(&mut buf).unwrap();
relay_tx.send(buf[..n].to_vec());
});
go!(move || {
let data = relay_rx.recv().unwrap();
(&writer).write_all(&data).unwrap();
done_tx.send(());
});
});
let mut client = TcpStream::connect(addr).unwrap();
client.write_all(b"split").unwrap();
let mut resp = [0u8; 5];
client.read_exact(&mut resp).unwrap();
assert_eq!(&resp, b"split");
done_rx.recv();
});
}
#[test]
fn net_peer_and_local_addr() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let listen_addr = listener.local_addr().unwrap();
let (addr_tx, addr_rx) = chan::<std::net::SocketAddr>(1);
go!(move || {
let conn = listener.accept().unwrap();
let local = conn.local_addr().expect("local_addr failed");
assert_eq!(local.port(), listen_addr.port());
let peer = conn.peer_addr().expect("peer_addr failed");
assert_ne!(peer.port(), 0);
addr_tx.send(peer);
});
let client = TcpStream::connect(listen_addr).unwrap();
let client_local = client.local_addr().expect("client local_addr failed");
let reported = addr_rx.recv().unwrap();
assert_eq!(reported.port(), client_local.port());
});
}
#[test]
fn net_bufreader_adapter() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let (done_tx, done_rx) = chan::<()>(1);
go!(move || {
let conn = listener.accept().unwrap();
let mut br = BufReader::new(conn);
let mut line = String::new();
br.read_line(&mut line).unwrap();
assert_eq!(line.trim_end(), "ping");
br.get_mut().write_all(b"pong\n").unwrap();
done_tx.send(());
});
let mut client = TcpStream::connect(addr).unwrap();
client.write_all(b"ping\n").unwrap();
let mut resp = String::new();
BufReader::new(client).read_line(&mut resp).unwrap();
assert_eq!(resp.trim_end(), "pong");
done_rx.recv();
});
}
#[test]
fn net_concurrent_connections() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
const N: usize = 8;
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let server_wg = Arc::new(WaitGroup::new());
let wg2 = Arc::clone(&server_wg);
go!(move || {
for _ in 0..N {
let conn = listener.accept().unwrap();
let wg3 = Arc::clone(&wg2);
wg3.add(1);
go!(move || {
let mut buf = [0u8; 4];
(&conn).read_exact(&mut buf).unwrap();
(&conn).write_all(&buf).unwrap();
wg3.done();
});
}
});
let results = Arc::new(Mutex::new(Vec::<bool>::new()));
let client_wg = Arc::new(WaitGroup::new());
for i in 0..N {
client_wg.add(1);
let results2 = Arc::clone(&results);
let client_wg2 = Arc::clone(&client_wg);
go!(move || {
let mut conn = TcpStream::connect(addr).unwrap();
let tag = [i as u8; 4];
conn.write_all(&tag).unwrap();
let mut resp = [0u8; 4];
conn.read_exact(&mut resp).unwrap();
results2.lock().unwrap().push(resp == tag);
client_wg2.done();
});
}
client_wg.wait();
server_wg.wait();
let ok = results.lock().unwrap();
assert_eq!(ok.len(), N, "wrong number of results");
assert!(ok.iter().all(|&b| b), "some echo checks failed");
});
}
#[test]
fn net_large_payload() {
let _g = NET_LOCK.lock().unwrap_or_else(|e| e.into_inner());
go_lib::run(|| {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
const SIZE: usize = 128 * 1024;
let payload: Vec<u8> = (0..SIZE).map(|i| (i % 251) as u8).collect();
let payload = Arc::new(payload);
let (done_tx, done_rx) = chan::<()>(1);
let payload2 = Arc::clone(&payload);
go!(move || {
let mut conn = listener.accept().unwrap();
let mut buf = vec![0u8; SIZE];
conn.read_exact(&mut buf).unwrap();
conn.write_all(&buf).unwrap();
done_tx.send(());
});
let mut client = TcpStream::connect(addr).unwrap();
client.write_all(&payload).unwrap();
let mut received = vec![0u8; SIZE];
client.read_exact(&mut received).unwrap();
assert_eq!(received, *payload2, "large payload echo mismatch");
done_rx.recv();
});
}