use std::sync::Arc;
use std::time::Duration;
use tokio::{
fs,
net::UdpSocket,
sync::Notify,
time::timeout,
};
use pasque::{
client::PsqClient,
server::{
config::Config,
PsqServer
},
stream::{
filestream::{FileStream, Files},
udptunnel::{UdpEndpoint, UdpTunnel},
PsqStream,
}, test_utils::init_logger, PsqError
};
#[tokio::test]
async fn test_get_request() {
init_logger();
let addr = "127.0.0.1:8888";
let config = Config::create_default();
let server = tokio::spawn(async move {
let mut psqserver = PsqServer::start(
addr,
&config,
).await.unwrap();
psqserver.add_endpoint(
"files",
Files::new(".")).await;
loop {
psqserver.process().await.unwrap();
}
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut psqconn = PsqClient::connect(
format!("https://{}/", addr).as_str(),
true,
).await.unwrap();
let ret = FileStream::get(
&mut psqconn,
"files/Cargo.toml",
"testout",
).await;
assert!(ret.is_ok());
let srclen = fs::metadata("Cargo.toml").await.unwrap().len();
let dstlen = fs::metadata("testout").await.unwrap().len();
let ret = ret.unwrap();
assert!(srclen == dstlen && srclen == ret as u64);
let ret = FileStream::get(
&mut psqconn,
"files/nonexisting",
"testout",
).await;
assert!(ret.is_err());
let ret = FileStream::get(
&mut psqconn,
"nonexisting",
"testout",
).await;
assert!(ret.is_err());
std::fs::remove_file("testout").unwrap();
server.abort();
}
async fn run_server(addr: &str, shutdown: Arc<Notify>) {
let config = Config::create_default();
let mut psqserver = PsqServer::start(addr, &config).await.unwrap();
psqserver.add_endpoint(
"udp",
UdpEndpoint::new(),
).await;
loop {
tokio::select! {
_ = shutdown.notified() => {
break;
}
result = psqserver.process() => {
result.unwrap();
}
}
}
}
async fn run_client(mut psqclient: PsqClient, shutdown: Arc<Notify>) {
loop {
tokio::select! {
_ = shutdown.notified() => {
break;
}
result = psqclient.process() => {
result.unwrap();
}
}
}
}
async fn run_udpserver(udpsocket: UdpSocket, shutdown: Arc<Notify>) {
loop {
let mut buf = [0u8; 2000];
tokio::select! {
_ = shutdown.notified() => {
break;
}
result = udpsocket.recv_from(&mut buf) => {
let (n, addr) = result.unwrap();
udpsocket.send_to(&buf[..n], addr).await.unwrap();
}
}
}
}
#[tokio::test]
async fn test_udp_tunnel() {
init_logger();
let addr = "127.0.0.1:7000";
let server_notify = Arc::new(Notify::new());
let server = tokio::spawn(run_server(addr, server_notify.clone()));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut psqclient = PsqClient::connect(
format!("https://{}/", addr).as_str(),
true,
).await.unwrap();
let ret = FileStream::get(
&mut psqclient,
"udp",
"testout",
).await;
assert!(matches!(ret, Err(PsqError::HttpResponse(405, _))));
let udptunnel = UdpTunnel::connect(
&mut psqclient,
"udp",
"127.0.0.1",
9002,
"127.0.0.1:0".parse().unwrap(),
).await.unwrap();
let tunneladdr = udptunnel.sockaddr().unwrap();
let client_notify = Arc::new(Notify::new());
let client = tokio::spawn(run_client(psqclient, client_notify.clone()));
let udpsocket = UdpSocket::bind("127.0.0.1:9002").await.unwrap();
let udpserver_notify = Arc::new(Notify::new());
let udpserver = tokio::spawn(run_udpserver(udpsocket, udpserver_notify.clone()));
tokio::time::sleep(Duration::from_millis(100)).await;
let result = timeout(Duration::from_millis(1000), async {
let udpclient = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mut buf = [0u8; 2000];
udpclient.send_to(b"Testing", tunneladdr).await.unwrap();
let (n, _) = udpclient.recv_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"Testing");
}).await;
assert!(result.is_ok(), "Test timed out");
udpserver_notify.notify_one();
udpserver.await.unwrap();
client_notify.notify_one();
client.await.unwrap();
server_notify.notify_one();
server.await.unwrap();
}
#[tokio::test]
async fn tunnel_closing() {
init_logger();
let addr = "127.0.0.1:9003";
let server = tokio::spawn(async move {
let config = Config::create_default();
let mut psqserver = PsqServer::start(addr, &config).await.unwrap();
psqserver.add_endpoint(
"udp",
UdpEndpoint::new(),
).await;
loop {
psqserver.process().await.unwrap();
}
});
tokio::time::sleep(Duration::from_millis(100)).await;
let mut psqconn = PsqClient::connect(
format!("https://{}/", addr).as_str(),
true,
).await.unwrap();
let _result = timeout(Duration::from_millis(1000), async {
let udptunnel = UdpTunnel::connect(
&mut psqconn,
"udp",
"127.0.0.1",
9004,
"127.0.0.1:0".parse().unwrap(),
).await.unwrap();
let tunneladdr = udptunnel.sockaddr().unwrap();
let stream_id = udptunnel.stream_id();
let client1 = tokio::spawn(async move {
let udpclient = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mut buf = [0u8; 2000];
udpclient.send_to(b"Testing", tunneladdr).await.unwrap();
let (n, _) = udpclient.recv_from(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"Testing");
});
tokio::time::sleep(Duration::from_millis(100)).await;
psqconn.remove_stream(stream_id).await;
psqconn.process().await.unwrap();
psqconn.remove_stream(stream_id).await;
client1.abort();
}).await;
server.abort();
}