use std::sync::Arc;
use bytesbuf::BytesView;
use bytesbuf::mem::GlobalPool;
use futures::TryStreamExt;
use http::Request;
use http_body_util::BodyExt;
use http_extensions::{HttpBodyBuilder, HttpBodyOptions, HttpRequest, HttpResponse, HttpResponseBuilder, RequestHandler};
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server;
use layered::{Execute, Intercept, Stack};
use ohno::ErrorExt;
use tick::Clock;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> Result<(), ohno::AppError> {
let body_builder = HttpBodyBuilder::new(GlobalPool::new(), &Clock::new_tokio());
let body_builder_clone = body_builder.clone();
let stack = (
Intercept::layer()
.on_input(|req: &HttpRequest| println!("received request, uri: {}", req.uri()))
.on_output(|result: &http_extensions::Result<HttpResponse>| match result {
Ok(response) => println!("response produced, status: {}", response.status()),
Err(error) => println!("response error, message: {}", error.message()),
}),
Execute::new(move |_req: HttpRequest| {
let clone = body_builder.clone();
async move {
HttpResponseBuilder::new(&clone).text("Hello, World!").build()
}
}),
);
serve_with_hyper(stack.into_service(), body_builder_clone).await?;
Ok(())
}
fn map_incoming_to_http_body(incoming: Incoming, body_builder: &HttpBodyBuilder) -> http_extensions::HttpBody {
let stream = incoming
.into_data_stream()
.map_ok(BytesView::from)
.map_err(|e| http_extensions::HttpError::other(e, recoverable::RecoveryInfo::unknown(), "hyper"));
body_builder.stream(stream, &HttpBodyOptions::default())
}
async fn serve_with_hyper<T: RequestHandler + 'static>(service: T, body_builder: HttpBodyBuilder) -> Result<(), ohno::AppError> {
let service = Arc::new(service);
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on: {}", listener.local_addr()?);
loop {
let (socket, _remote_addr) = listener.accept().await?;
let service_cloned = Arc::clone(&service);
let body_builder = body_builder.clone();
tokio::spawn(async move {
let hyper_service = hyper::service::service_fn(move |request: Request<Incoming>| {
let request = request.map(|incoming| map_incoming_to_http_body(incoming, &body_builder));
let service_cloned = Arc::clone(&service_cloned);
async move { service_cloned.execute(request).await }
});
let builder = server::conn::auto::Builder::new(TokioExecutor::new());
let socket = TokioIo::new(socket);
if let Err(e) = builder.serve_connection_with_upgrades(socket, hyper_service).await {
eprintln!("failed to serve connection: {e}");
}
});
}
}