use crate::macros::{process_impl, relay_impl, serve1_connection_impl};
use http_pool::http1::HttpPool;
use http_pool::net_pool::{
Pool, Pools, debug, error, info, instrument_debug_span, tokio_spawn, warn2,
};
use hyper::body::{Body, Incoming};
use hyper::{Request, Response};
use net_relay::Builder;
use std::error::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
pub struct Relay<F, Ret, P = http_pool::http1::Pool>
where
F: Fn(Pools<P>, Request<Incoming>) -> Ret,
Ret: Future,
P: Pool + HttpPool,
{
parts: net_relay::builder::Parts<P, F>,
pending: Option<Pin<Box<dyn Future<Output = Result<(), net_relay::Error>> + Send + 'static>>>,
pools: Pools<P>,
}
impl<F, Ret, P> Relay<F, Ret, P>
where
F: Fn(Pools<P>, Request<Incoming>) -> Ret,
Ret: Future,
P: Pool + HttpPool,
{
relay_impl! {}
}
impl<F, Ret, ResBody, E, P> net_relay::Relay for Relay<F, Ret, P>
where
F: Fn(Pools<P>, Request<Incoming>) -> Ret + Send + Sync + 'static,
Ret: Future<Output = Result<Response<ResBody>, E>> + Send + 'static,
ResBody: Body + 'static + Send + Sync,
ResBody::Error: Into<Box<dyn Error + Send + Sync>>,
ResBody::Data: Send,
E: Into<Box<dyn Error + Send + Sync>>,
P: Pool + HttpPool + Send + 'static,
{
fn poll_run(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), net_relay::Error>> {
if self.pending.is_none() {
let tuple = (
self.bind_addrs().clone(),
self.relay_fn(),
self.pools.clone(),
);
self.pending = Some(Box::pin(async move {
let listener = tokio::net::TcpListener::bind(tuple.0.as_slice()).await?;
info!(
"listen on: {:?}",
listener.local_addr().unwrap()
);
loop {
match listener.accept().await {
Err(_e) => {
error!(
"failed to accept from listen, error occurred: {:?}",
_e
);
}
Ok((client, _addr)) => {
let fut = serve1_connection_impl! {tuple.2, tuple.1,client};
tokio_spawn! {
instrument_debug_span! {
fut,
"new_stream",
address=_addr.to_string()
}
};
}
}
}
}));
}
self.pending.as_mut().unwrap().as_mut().poll(cx)
}
}
pub async fn default_relay_fn(
pools: Pools<http_pool::http1::Pool>,
mut req: Request<Incoming>,
) -> Result<Response<http_pool::body::VariantBody>, net_relay::Error> {
let _uri = req.uri().clone();
debug!("recv request uri: {:?}", _uri);
let process = async move || {
let path = req.uri().path();
if let Some(pool) = pools.get_pool(path) {
let mut sender = pool
.get(path)
.await
.map_err(|e| net_relay::Error::from_other(e))?;
let uri = sender
.new_uri(req.uri())
.map_err(|e| net_relay::Error::from_other(e))?;
*req.uri_mut() = uri;
sender
.send_request(req.map(|b| http_pool::body::variant_body(b)))
.await
.map_err(|e| net_relay::Error::from_other(e))
} else {
Err(net_relay::Error::NotMatchPath)
}
};
match process().await {
Ok(r) => Ok(r.map(|b| http_pool::body::variant_body(b))),
Err(_e) => {
debug!(
"send response uri: {:?}, error occurred: {:?}",
_uri, _e
);
Ok(crate::util::bad_gateway())
}
}
}