use futures::select;
use futures::{FutureExt, Stream};
use std::future::Future;
use std::pin::pin;
use std::task::{Context, Poll};
use tower::load::pending_requests::PendingRequests;
use tower_service::Service;
use crate::pool::{PoolConfig, balancer_pool};
use crate::{Connector, HealthChecker};
type ChannelInner<S, ReqBody> =
tower::buffer::Buffer<http::Request<ReqBody>, <S as Service<http::Request<ReqBody>>>::Future>;
pub struct Channel<S: Service<http::Request<ReqBody>>, ReqBody: Send + 'static>(
ChannelInner<S, ReqBody>,
);
impl<S, ReqBody: Send> Clone for Channel<S, ReqBody>
where
S: Service<http::Request<ReqBody>>,
S::Future: Send + 'static,
{
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
async fn client_worker<BW, W>(buffer_worker: BW, worker: W)
where
BW: Future + Send + 'static,
W: Future + Send + 'static,
{
let _ = tokio::spawn(async move {
let mut buffer_worker = pin!(buffer_worker.fuse());
let mut worker = pin!(worker.fuse());
select! {
_ = buffer_worker => (),
_ = worker => (),
}
})
.await;
}
pub(crate) type PoolService<A, ReqBody, C, HC> = crate::balance_driver::ShimmedService<
tower::balance::p2c::Balance<
crate::balance_driver::ShimmedStream<
crate::pool::DiscoverItem<
PendingRequests<crate::connection::HTTP2Connection<A, C, HC, ReqBody>>,
>,
>,
http::Request<ReqBody>,
>,
>;
pub fn pool_service<A, RS, RE, C, HC, HR, ReqBody, L>(
config: PoolConfig,
label: L,
connector: C,
resolution_stream: RS,
health_checker: HC,
healthy_callback: HR,
) -> (PoolService<A, ReqBody, C, HC>, impl Future<Output = ()>)
where
A: std::hash::Hash + std::fmt::Debug + Eq + Send + Sync + Clone + 'static,
RS: Stream<Item = Result<Vec<A>, RE>>,
RE: std::error::Error,
C: Connector<A> + Send + Sync + 'static,
C::IO: Send,
C::Error: Send + Sync + 'static,
HC: HealthChecker<ReqBody> + Send + Sync + 'static,
HC::Error: std::error::Error + Clone + Send + Sync,
HR: Fn(bool) + Send + 'static,
ReqBody: hyper::body::Body + Send + Unpin + 'static,
ReqBody::Data: Send,
ReqBody::Error: std::error::Error + Send + Sync,
L: AsRef<str>,
{
let discover = balancer_pool(
config,
label,
crate::connection::HTTP2ConnectionMaker::new(
connector,
health_checker,
tower::ServiceBuilder::new().layer_fn(|s| {
PendingRequests::new(s, tower::load::completion::CompleteOnResponse::default())
}),
),
crate::default_backoff(),
healthy_callback,
resolution_stream,
);
let mut shim = crate::balance_driver::Shim::new();
let stack = tower::balance::p2c::Balance::new(shim.stream());
shim.service_and_worker(stack, discover)
}
impl<S, ReqBody> Channel<S, ReqBody>
where
ReqBody: Send,
S: Service<http::Request<ReqBody>> + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>> + Send + Sync,
S::Future: Send + 'static,
{
pub fn new<W>(stack: S, worker: W) -> (Self, impl Future<Output = ()>)
where
W: Future + Send + 'static,
{
let (stack, buffer_worker) = tower::buffer::Buffer::pair(stack, 1024);
let worker = client_worker(buffer_worker, worker);
(Self(stack), worker)
}
}
impl<S, ReqBody> Service<http::Request<ReqBody>> for Channel<S, ReqBody>
where
ReqBody: Send,
S: Service<http::Request<ReqBody>>,
S::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
S::Future: Send + 'static,
{
type Response = <ChannelInner<S, ReqBody> as Service<http::Request<ReqBody>>>::Response;
type Error = <ChannelInner<S, ReqBody> as Service<http::Request<ReqBody>>>::Error;
type Future = <ChannelInner<S, ReqBody> as Service<http::Request<ReqBody>>>::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(&mut self.0, cx)
}
fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
Service::call(&mut self.0, request)
}
}