use futures::Stream;
use std::future::Future;
use crate::Connector;
use crate::util::AssumeAlwaysHealthy;
pub type HTTPChannel<A, C, ReqBody> = crate::channel::Channel<
crate::channel::PoolService<A, ReqBody, C, AssumeAlwaysHealthy>,
ReqBody,
>;
#[derive(Clone, Debug, Default)]
pub struct HTTPChannelConfig {
pub pool: crate::pool::PoolConfig,
}
pub fn http_channel<A, RS, RE, C, ReqBody, L>(
config: HTTPChannelConfig,
label: L,
connector: C,
resolution_stream: RS,
) -> (HTTPChannel<A, C, ReqBody>, impl Future<Output = ()>)
where
A: std::hash::Hash + Send + Sync + std::fmt::Debug + Eq + Clone + 'static,
RS: Stream<Item = Result<Vec<A>, RE>> + Send + 'static,
RE: std::error::Error + Send + 'static,
C: Connector<A> + Send + Sync + 'static,
C::IO: Send,
C::Error: Send + Sync + 'static,
ReqBody: hyper::body::Body + Send + Unpin,
ReqBody::Error: std::error::Error + Send + Sync,
ReqBody::Data: Send,
L: AsRef<str> + Send + 'static,
{
#[allow(clippy::default_constructed_unit_structs)]
let health_checker = AssumeAlwaysHealthy::default();
let (stack, shim_worker) = crate::channel::pool_service(
config.pool,
label,
connector,
resolution_stream,
health_checker,
|_| (),
);
crate::channel::Channel::new(stack, shim_worker)
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use futures::future::Either;
use std::pin::pin;
use tower::ServiceExt;
use tower_service::Service;
use super::*;
use crate::testutil::{TestServer, TestServerAddress};
#[tokio::test]
async fn end_to_end_success() {
let rs = futures::stream::once(futures::future::ready(Ok::<_, std::convert::Infallible>(
vec![TestServerAddress::Working],
)));
let (c, worker) = http_channel(HTTPChannelConfig::default(), "test", TestServer::new(), rs);
let req = http::Request::builder()
.uri("http://nowhere/success")
.body(String::new())
.expect("trivial test request");
let mut c2 = c.clone();
let fut = pin!(async move {
c2.ready()
.await
.expect("ready")
.call(req)
.await
.expect("successful request")
});
let worker = pin!(worker.fuse());
let (resp, worker) = match futures::future::select(fut, worker).await {
Either::Left(r) => r,
Either::Right(_) => {
panic!("lost worker");
}
};
assert_eq!(resp.status(), 200);
std::mem::drop(c);
worker.await;
}
}