beatrice 0.3.2

This library was renamed to Servlin.
Documentation
mod test_util;

use crate::test_util::{async_test, connected_streams};
use beatrice::internal::{handle_http_conn, Token};
use beatrice::{HttpConn, Request, Response};
use futures_lite::{AsyncReadExt, AsyncWriteExt};
use permit::Permit;
use std::future::Future;
use std::io::ErrorKind;
use std::net::Shutdown;
use std::time::{Duration, Instant};
use temp_dir::TempDir;

const MILLIS_100: Duration = Duration::from_millis(100);

async fn handle_http_conn_task<F, Fut>(request_handler: F) -> async_net::TcpStream
where
    Fut: Future<Output = Response> + std::marker::Send,
    F: FnOnce(Request) -> Fut + 'static + Send + Sync + Clone,
{
    let (stream0, stream1) = connected_streams().await;
    let addr = stream1.local_addr().unwrap();
    let temp_dir = TempDir::new().unwrap();
    safina_executor::spawn(async move {
        handle_http_conn(
            Permit::new(),
            Token::new(),
            HttpConn::new(addr, stream0),
            Some(temp_dir.path().to_path_buf()),
            64 * 1024,
            request_handler,
        )
        .await;
    });
    stream1
}

async fn read_response(
    stream: &mut async_net::TcpStream,
    timeout: Duration,
) -> Result<String, std::io::Error> {
    let deadline = Instant::now() + timeout;
    let mut buf = Vec::new();
    while Instant::now() < deadline {
        let mut chunk = [0_u8; 1024];
        let result = safina_timer::with_timeout(
            async { stream.read(&mut chunk).await },
            Duration::from_millis(10),
        )
        .await;
        match result {
            Err(safina_timer::DeadlineExceeded) => {}
            Ok(Ok(0)) => break,
            Ok(Ok(num_read)) => buf.extend(&chunk[..num_read]),
            Ok(Err(e)) => return Err(e),
        };
    }
    Ok(String::from_utf8(buf)
        .map_err(|_| std::io::Error::new(ErrorKind::InvalidData, "response is not UTF-8"))?)
}

#[test]
fn handle_http_conn_ok() {
    async_test(async {
        let mut stream =
            handle_http_conn_task(|_req: Request| async { Response::text(200, "ok") }).await;
        stream.write_all(b"M / HTTP/1.1\r\n\r\n").await.unwrap();
        assert_eq!(
            "HTTP/1.1 200 OK\r\ncontent-type: text/plain; charset=UTF-8\r\ncontent-length: 2\r\n\r\nok",
            read_response(&mut stream, MILLIS_100)
                .await
                .unwrap()
                .as_str()
        );
    });
}

#[test]
fn handle_http_conn_shutdown() {
    async_test(async {
        let mut stream =
            handle_http_conn_task(|_req: Request| async { Response::text(200, "ok") }).await;
        stream.shutdown(Shutdown::Write).unwrap();
        assert_eq!(
            "",
            read_response(&mut stream, MILLIS_100)
                .await
                .unwrap()
                .as_str()
        );
    });
}

#[test]
fn handle_http_conn_upload() {
    async_test(async {
        let mut stream = handle_http_conn_task(|req: Request| async move {
            let mut body_string = String::new();
            req.body
                .async_reader()
                .await
                .unwrap()
                .read_to_string(&mut body_string)
                .await
                .unwrap();
            Response::text(200, format!("read {:?}", body_string))
        })
        .await;
        stream
            .write_all(b"M / HTTP/1.1\r\ncontent-length:3\r\n\r\nabc")
            .await
            .unwrap();
        assert_eq!(
            "HTTP/1.1 200 OK\r\ncontent-type: text/plain; charset=UTF-8\r\ncontent-length: 10\r\n\r\nread \"abc\"",
            read_response(&mut stream, MILLIS_100)
                .await
                .unwrap()
                .as_str()
        );
    });
}

#[test]
fn handle_http_conn_upload_large() {
    async_test(async {
        let mut stream = handle_http_conn_task(|req: Request| async move {
            if req.body.is_pending() {
                return Response::get_body_and_reprocess(10_000_000);
            }
            let mut body_string = String::new();
            req.body
                .async_reader()
                .await
                .unwrap()
                .read_to_string(&mut body_string)
                .await
                .unwrap();
            Response::text(200, format!("got {}", body_string.len()))
        })
        .await;
        stream
            .write_all(b"M / HTTP/1.1\r\ncontent-length:10000000\r\n\r\n")
            .await
            .unwrap();
        for _ in 0..10_000 {
            stream.write_all(&[b'a'; 1000]).await.unwrap();
        }
        stream.flush().await.unwrap();
        assert_eq!(
            "HTTP/1.1 200 OK\r\ncontent-type: text/plain; charset=UTF-8\r\ncontent-length: 12\r\n\r\ngot 10000000",
            read_response(&mut stream, Duration::from_secs(3))
                .await
                .unwrap()
                .as_str()
        );
    });
}