Documentation
use crate::macros::{process_impl, relay_impl, serve2_connection_impl};
use http_pool::http2::HttpPool;
use http_pool::net_pool::{
    Pool, Pools, debug, error, info, instrument_debug_span, tokio_spawn, warn2,
};
use hyper::body::{Body, Incoming};
use hyper::{Request, Response};
use net_relay::Builder;
use std::error::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

/// http2 relay
pub struct Relay<F, Ret, P = http_pool::http2::Pool>
where
    F: Fn(Pools<P>, Request<Incoming>) -> Ret,
    Ret: Future,
    P: Pool + HttpPool,
{
    parts: net_relay::builder::Parts<P, F>,
    pending: Option<Pin<Box<dyn Future<Output = Result<(), net_relay::Error>> + Send + 'static>>>,
    pools: Pools<P>,
}

impl<F, Ret, P> Relay<F, Ret, P>
where
    F: Fn(Pools<P>, Request<Incoming>) -> Ret,
    Ret: Future,
    P: Pool + HttpPool,
{
    relay_impl! {}

    /// 最大的并发流数, 可在构造Service前通过Builder来设置该值, 默认是无限制
    pub fn max_streams(&self) -> Option<u32> {
        self.parts.max_streams.clone()
    }
}

impl<F, Ret, ResBody, E, P> net_relay::Relay for Relay<F, Ret, P>
where
    F: Fn(Pools<P>, Request<Incoming>) -> Ret + Send + Sync + 'static,
    Ret: Future<Output = Result<Response<ResBody>, E>> + Send + 'static,
    ResBody: Body + 'static + Send + Sync,
    ResBody::Error: Into<Box<dyn Error + Send + Sync>>,
    ResBody::Data: Send,
    E: Into<Box<dyn Error + Send + Sync>>,
    P: Pool + HttpPool + Send + 'static,
{
    fn poll_run(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), net_relay::Error>> {
        if self.pending.is_none() {
            let tuple = (
                self.bind_addrs().clone(),
                self.relay_fn(),
                self.pools.clone(),
                self.max_streams(),
            );

            self.pending = Some(Box::pin(async move {
                let listener = tokio::net::TcpListener::bind(tuple.0.as_slice()).await?;

                info!(
                    "listen on: {:?}",
                    listener.local_addr().unwrap()
                );

                loop {
                    match listener.accept().await {
                        Err(_e) => {
                            error!(
                                "failed to accept listen, cause error: {:?}",
                                _e
                            );
                        }
                        Ok((client, _addr)) => {
                            let fut = serve2_connection_impl! {tuple.2, tuple.1, client, tuple.3};
                            tokio_spawn! {
                                instrument_debug_span! {
                                    fut,
                                    "new_stream",
                                    address=_addr.to_string()
                                }
                            };
                        }
                    }
                }
            }));
        }

        self.pending.as_mut().unwrap().as_mut().poll(cx)
    }
}

/// 默认的relay_fn实现
pub async fn default_relay_fn(
    pools: Pools<http_pool::http2::Pool>,
    mut req: Request<Incoming>,
) -> Result<Response<http_pool::body::VariantBody>, net_relay::Error> {
    let _uri = req.uri().clone();

    let process = async || {
        let path = req.uri().path();
        if let Some(pool) = pools.get_pool(path) {
            let mut sender = pool
                .get(&path)
                .await
                .map_err(|e| net_relay::Error::from_other(e))?;

            let uri = sender
                .new_uri(req.uri())
                .map_err(|e| net_relay::Error::from_other(e))?;

            // warn!!!
            // 转发Response<Incoming>的时候可能会导致多一个空data帧.且没有trailer,最后一个header帧也没有设置end_stream标识,导致
            // 有些grpc客户端无法解析出错误码
            *req.uri_mut() = uri;
            sender
                .send_request(req.map(|b| http_pool::body::variant_body(b)))
                .await
                .map_err(|e| net_relay::Error::from_other(e))
        } else {
            Err(net_relay::Error::NotMatchPath)
        }
    };

    match process().await {
        Ok(r) => Ok(r.map(|b| http_pool::body::variant_body(b))),
        Err(_e) => {
            debug!(
                "send response uri: {:?}, error occurred: {:?}",
                _uri, _e
            );
            Ok(crate::util::bad_gateway())
        }
    }
}