use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_util::future::{self, BoxFuture};
use futures_util::{FutureExt as _, TryFutureExt as _};
use http::Uri;
use hyper_util::rt::TokioIo;
use log::info;
use tokio::net::TcpStream;
use tokio::time::Sleep;
use tower_service::Service;
use crate::handler::NewHandler;
use crate::test::async_test::{AsyncTestClient, AsyncTestServerInner};
use crate::test::{self, TestClient, TestServerData};
use std::time::Duration;
#[derive(Clone)]
pub struct TestServer {
data: Arc<TestServerData>,
}
impl test::Server for TestServer {
fn run_future<F, O>(&self, future: F) -> O
where
F: Future<Output = O>,
{
self.data.run_future(future)
}
fn request_expiry(&self) -> Sleep {
self.data.request_expiry()
}
}
impl TestServer {
pub fn new<NH: NewHandler + 'static>(new_handler: NH) -> anyhow::Result<TestServer> {
TestServer::with_timeout(new_handler, 10)
}
pub fn with_timeout<NH: NewHandler + 'static>(
new_handler: NH,
timeout: u64,
) -> anyhow::Result<TestServer> {
let data = TestServerData::new(new_handler, timeout, future::ok)?;
Ok(TestServer {
data: Arc::new(data),
})
}
pub fn client(&self) -> TestClient<Self, TestConnect> {
self.data.client(self)
}
pub fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.data.spawn(future)
}
}
#[derive(Clone)]
pub struct AsyncTestServer {
inner: Arc<AsyncTestServerInner>,
}
impl AsyncTestServer {
pub async fn new<NH: NewHandler + 'static>(new_handler: NH) -> anyhow::Result<AsyncTestServer> {
AsyncTestServer::new_with_timeout(new_handler, Duration::from_secs(10)).await
}
pub async fn new_with_timeout<NH: NewHandler + 'static>(
new_handler: NH,
timeout: Duration,
) -> anyhow::Result<AsyncTestServer> {
let inner = AsyncTestServerInner::new(new_handler, timeout, future::ok).await?;
Ok(AsyncTestServer {
inner: Arc::new(inner),
})
}
pub fn client(&self) -> AsyncTestClient<super::test::TestConnect> {
self.inner.client()
}
}
#[derive(Clone)]
pub struct TestConnect {
pub(crate) addr: SocketAddr,
}
impl Service<Uri> for TestConnect {
type Response = TokioIo<TcpStream>;
type Error = tokio::io::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<std::result::Result<(), Self::Error>> {
Ok(()).into()
}
fn call(&mut self, _req: Uri) -> Self::Future {
TcpStream::connect(self.addr)
.inspect(|s| info!("Client TcpStream connected: {:?}", s))
.map_ok(TokioIo::new)
.boxed()
}
}
impl From<SocketAddr> for TestConnect {
fn from(addr: SocketAddr) -> Self {
Self { addr }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test::helper::TestHandler;
use crate::test::{self, async_test, Server};
use tokio::sync::oneshot;
#[test]
fn test_server_serves_requests() {
test::common_tests::serves_requests(TestServer::new, TestServer::client)
}
#[test]
fn test_server_times_out() {
test::common_tests::times_out(TestServer::with_timeout, TestServer::client)
}
#[test]
fn test_server_async_echo() {
test::common_tests::async_echo(TestServer::new, TestServer::client)
}
#[test]
fn test_server_supports_multiple_servers() {
test::common_tests::supports_multiple_servers(TestServer::new, TestServer::client)
}
#[test]
fn test_server_spawns_and_runs_futures() {
let server = TestServer::new(TestHandler::default()).unwrap();
let (sender, spawn_receiver) = oneshot::channel();
let (spawn_sender, run_receiver) = oneshot::channel();
sender.send(1).unwrap();
server.spawn(async move {
assert_eq!(1, spawn_receiver.await.unwrap());
spawn_sender.send(42).unwrap();
});
assert_eq!(42, server.run_future(run_receiver).unwrap());
}
#[test]
fn test_server_adds_client_address_to_state() {
test::common_tests::adds_client_address_to_state(TestServer::new, TestServer::client);
}
#[tokio::test]
async fn async_test_server_serves_requests() {
async_test::common_tests::serves_requests(AsyncTestServer::new, AsyncTestServer::client)
.await;
}
#[tokio::test]
async fn async_test_server_times_out() {
async_test::common_tests::times_out(
AsyncTestServer::new_with_timeout,
AsyncTestServer::client,
)
.await;
}
#[tokio::test]
async fn async_test_server_echo() {
async_test::common_tests::echo(AsyncTestServer::new, AsyncTestServer::client).await;
}
#[tokio::test]
async fn async_test_server_supports_multiple_servers() {
async_test::common_tests::supports_multiple_servers(
AsyncTestServer::new,
AsyncTestServer::client,
)
.await;
}
#[tokio::test]
async fn async_test_server_adds_client_address_to_state() {
async_test::common_tests::adds_client_address_to_state(
AsyncTestServer::new,
AsyncTestServer::client,
)
.await;
}
}