use std::{net::SocketAddr, sync::Arc};
use super::StreamHandler;
use crate::{handler::GateWayHandler, route::GateWayRoute};
use gateway_common::{shutdown::Shutdown, FusenFuture};
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use tokio::{net::TcpStream, sync::mpsc::Sender};
use tracing::{debug, error};
pub struct Http1StreamHandler {
stream: TcpStream,
_addr: SocketAddr,
gateway_handler: Arc<GateWayHandler>,
shutdown: Shutdown,
shutdown_tx: Sender<()>,
}
impl Http1StreamHandler {
pub fn new(
stream: TcpStream,
_addr: SocketAddr,
gateway_handler: Arc<GateWayHandler>,
shutdown: Shutdown,
shutdown_tx: Sender<()>,
) -> Self {
Self {
stream,
_addr,
gateway_handler,
shutdown,
shutdown_tx,
}
}
}
impl StreamHandler for Http1StreamHandler {
fn handler(self) -> FusenFuture<Result<(), gateway_common::error::BoxError>> {
Box::pin(async move {
let Http1StreamHandler {
stream,
_addr,
gateway_handler,
mut shutdown,
shutdown_tx,
} = self;
let hyper_io = TokioIo::new(stream);
let gateway_route =
GateWayRoute::new(gateway_handler, crate::protocol::ProtocolType::HTTP);
let future = http1::Builder::new()
.serve_connection(hyper_io, gateway_route)
.with_upgrades();
tokio::select! {
res = future =>
match res {
Ok(_) => debug!("client close ~"),
Err(error) => error!("client error info : {:?}",error),
}
,
_ = shutdown.recv() => debug!("http1 handler shutdown")
};
drop(shutdown_tx);
Ok(())
})
}
fn boxed(self) -> Box<dyn StreamHandler> {
Box::new(self)
}
}