1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use std::{net::SocketAddr, sync::Arc};

use super::StreamHandler;
use crate::{handler::GateWayHandler, route::GateWayRoute};
use gateway_common::{shutdown::Shutdown, FusenFuture};
use hyper::server::conn::http1;
use hyper_util::rt::TokioIo;
use tokio::{net::TcpStream, sync::mpsc::Sender};
use tracing::{debug, error};

pub struct Http1StreamHandler {
    stream: TcpStream,
    _addr: SocketAddr,
    gateway_handler: Arc<GateWayHandler>,
    shutdown: Shutdown,
    shutdown_tx: Sender<()>,
}

impl Http1StreamHandler {
    pub fn new(
        stream: TcpStream,
        _addr: SocketAddr,
        gateway_handler: Arc<GateWayHandler>,
        shutdown: Shutdown,
        shutdown_tx: Sender<()>,
    ) -> Self {
        Self {
            stream,
            _addr,
            gateway_handler,
            shutdown,
            shutdown_tx,
        }
    }
}

impl StreamHandler for Http1StreamHandler {
    fn handler(self) -> FusenFuture<Result<(), gateway_common::error::BoxError>> {
        Box::pin(async move {
            let Http1StreamHandler {
                stream,
                _addr,
                gateway_handler,
                mut shutdown,
                shutdown_tx,
            } = self;
            let hyper_io = TokioIo::new(stream);
            let gateway_route =
                GateWayRoute::new(gateway_handler, crate::protocol::ProtocolType::HTTP);
            let future = http1::Builder::new()
                .serve_connection(hyper_io, gateway_route)
                .with_upgrades();
            tokio::select! {
                    res = future =>
                        match res {
                            Ok(_) =>  debug!("client close ~"),
                            Err(error) => error!("client error info : {:?}",error),
                        }
                     ,
                    _ = shutdown.recv() => debug!("http1 handler shutdown")
            };
            drop(shutdown_tx);
            Ok(())
        })
    }

    fn boxed(self) -> Box<dyn StreamHandler> {
        Box::new(self)
    }
}