use aws_smithy_http_server::{routing::IntoMakeService, serve::IncomingStream};
use http::{Request, Response, StatusCode};
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
use hyper_util::{
rt::{TokioExecutor, TokioIo, TokioTimer},
server::conn::auto::Builder,
service::TowerToHyperService,
};
use std::{convert::Infallible, sync::Arc, time::Duration};
use tokio::{net::TcpListener, sync::Semaphore};
use tower::{service_fn, ServiceBuilder, ServiceExt};
use tower_http::timeout::TimeoutLayer;
use tracing::{info, warn};
async fn hello_handler(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello, World!\n"))))
}
async fn slow_handler(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
let body = req.into_body();
let bytes = match body.collect().await {
Ok(collected) => collected.to_bytes(),
Err(e) => {
warn!("slow handler: error reading body: {}", e);
return Ok(Response::new(Full::new(Bytes::from("Error reading body\n"))));
}
};
info!("slow handler: received {} bytes, sleeping for 45 seconds", bytes.len());
tokio::time::sleep(Duration::from_secs(45)).await;
if bytes.is_empty() {
Ok(Response::new(Full::new(Bytes::from("Completed after 45 seconds\n"))))
} else {
Ok(Response::new(Full::new(bytes)))
}
}
async fn router(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
match req.uri().path() {
"/slow" => slow_handler(req).await,
_ => hello_handler(req).await,
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let listener = TcpListener::bind("0.0.0.0:3000").await?;
let local_addr = listener.local_addr()?;
info!("Server listening on http://{}", local_addr);
info!("Configuration:");
info!(" - Header read timeout: 10 seconds");
info!(" - Request timeout: 30 seconds");
info!(" - Connection duration limit: 5 minutes");
info!(" - Max concurrent connections: 1000");
info!(" - HTTP/2 keep-alive: 60s interval, 20s timeout");
let connection_semaphore = Arc::new(Semaphore::new(1000));
let base_service = ServiceBuilder::new()
.layer(TimeoutLayer::with_status_code(
StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(30),
))
.service(service_fn(router));
let make_service = IntoMakeService::new(base_service);
loop {
let (stream, remote_addr) = listener.accept().await?;
let permit = match connection_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!("connection limit reached, rejecting connection from {}", remote_addr);
drop(stream);
continue;
}
};
info!("accepted connection from {}", remote_addr);
let make_service = make_service.clone();
tokio::spawn(async move {
let _permit = permit;
let io = TokioIo::new(stream);
let tower_service =
match ServiceExt::oneshot(make_service, IncomingStream::<TcpListener> { io: &io, remote_addr }).await {
Ok(svc) => svc,
Err(_) => {
warn!("failed to create service for connection from {}", remote_addr);
return;
}
};
let hyper_service = TowerToHyperService::new(tower_service);
let mut builder = Builder::new(TokioExecutor::new());
builder
.http1()
.timer(TokioTimer::new())
.header_read_timeout(Duration::from_secs(10))
.keep_alive(true);
builder
.http2()
.timer(TokioTimer::new())
.keep_alive_interval(Duration::from_secs(60))
.keep_alive_timeout(Duration::from_secs(20));
let conn = builder.serve_connection(io, hyper_service);
match tokio::time::timeout(Duration::from_secs(300), conn).await {
Ok(Ok(())) => {
info!("connection from {} closed normally", remote_addr);
}
Ok(Err(e)) => {
warn!("error serving connection from {}: {:?}", remote_addr, e);
}
Err(_) => {
info!("connection from {} exceeded 5 minutes duration limit", remote_addr);
}
}
});
}
}