#[cfg(any(feature = "tcp"))]
mod tests {
use tcp_stream::TLSConfig;
use nbio::{
tcp::{TcpServer, TcpSession},
Publish, PublishOutcome, Receive, ReceiveOutcome, Session, SessionStatus,
};
#[test]
pub fn tcp_client_server() {
let server = TcpServer::bind("127.0.0.1:33001").unwrap();
let mut client = TcpSession::connect("127.0.0.1:33001").unwrap();
let mut session = None;
while let None = session {
session = server.accept().unwrap().map(|(s, _)| s);
}
let mut session = session.unwrap();
while client.status() == SessionStatus::Establishing
|| session.status() == SessionStatus::Establishing
{
client.drive().unwrap();
session.drive().unwrap();
}
let mut receive_buffer = Vec::new();
let mut publish_payload = Vec::new();
for i in 0..9999999 {
publish_payload.push(i as u8)
}
let mut remaining = publish_payload.as_slice();
while let PublishOutcome::Incomplete(pw) = client.publish(remaining).unwrap() {
remaining = pw;
if let ReceiveOutcome::Payload(receive) = session.receive().unwrap() {
receive_buffer.extend_from_slice(receive);
}
}
while receive_buffer.len() < 9999999 {
if let ReceiveOutcome::Payload(receive) = session.receive().unwrap() {
receive_buffer.extend_from_slice(receive);
}
}
assert_eq!(receive_buffer.len(), publish_payload.len());
assert_eq!(receive_buffer, publish_payload);
}
#[test]
pub fn tcp_tls_after_establishing() {
let mut client = TcpSession::connect("www.google.com:443").unwrap();
while client.status() == SessionStatus::Establishing {
client.drive().unwrap();
}
assert_eq!(client.status(), SessionStatus::Established);
let mut client = client
.into_tls("www.google.com", TLSConfig::default())
.unwrap();
while client.status() == SessionStatus::Establishing {
client.drive().unwrap();
}
assert_eq!(client.status(), SessionStatus::Established);
let request = "GET / HTTP/1.1\r\nhost: www.google.com\r\n\r\n"
.as_bytes()
.to_vec();
let mut remaining = request.as_slice();
while let Ok(PublishOutcome::Incomplete(pw)) = client.publish(remaining) {
remaining = pw;
client.drive().unwrap();
}
let mut response = Vec::new();
while response.len() < 9 {
if let ReceiveOutcome::Payload(receive) = client.receive().unwrap() {
response.extend_from_slice(receive);
}
}
assert!(String::from_utf8_lossy(&response).starts_with("HTTP/1.1 "));
}
#[test]
pub fn tcp_tls_before_establishing() {
let mut client = TcpSession::connect("www.google.com:443")
.unwrap()
.into_tls("www.google.com", TLSConfig::default())
.unwrap();
while client.status() == SessionStatus::Establishing {
client.drive().unwrap();
}
assert_eq!(client.status(), SessionStatus::Established);
let request = "GET / HTTP/1.1\r\nhost: www.google.com\r\n\r\n"
.as_bytes()
.to_vec();
let mut remaining = request.as_slice();
while let Ok(PublishOutcome::Incomplete(pw)) = client.publish(remaining) {
remaining = pw;
client.drive().unwrap();
}
let mut response = Vec::new();
while response.len() < 9 {
if let ReceiveOutcome::Payload(receive) = client.receive().unwrap() {
response.extend_from_slice(receive);
}
}
assert!(String::from_utf8_lossy(&response).starts_with("HTTP/1.1 "));
}
#[test]
pub fn tcp_slow_consumer() {
let server = TcpServer::bind("127.0.0.1:33002").unwrap();
let mut client = TcpSession::connect("127.0.0.1:33002").unwrap();
let mut session = server.accept().unwrap().unwrap().0;
while client.status() == SessionStatus::Establishing
|| session.status() == SessionStatus::Establishing
{
client.drive().unwrap();
session.drive().unwrap();
}
let mut received: Vec<u8> = Vec::new();
let mut backpressure = false;
for i in 0..100000 {
let publish_payload = format!("test test test test hello world {i:06}!");
let mut remaining = publish_payload.as_bytes();
while let PublishOutcome::Incomplete(pw) = client.publish(remaining).unwrap() {
remaining = pw;
backpressure = true;
for _ in 0..10 {
if let ReceiveOutcome::Payload(receive) = session.receive().unwrap() {
received.extend_from_slice(&receive);
}
}
}
}
assert!(backpressure);
while received.len() < (100000 * 39) {
client.drive().unwrap();
if let ReceiveOutcome::Payload(receive) = session.receive().unwrap() {
received.extend_from_slice(&receive);
}
}
assert_eq!(received.len(), 100000 * 39)
}
}