warm_channels 0.3.0

Always-ready HTTP client channels for gRPC or other RPC-like requests.
Documentation
use futures::future::Either;
use http::StatusCode;
use hyper::service::service_fn;
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::io::DuplexStream;

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub(crate) enum TestServerAddress {
    Working,
    FailToConnect,
    HangsOnConnect,
    BrokenStream,
}

#[derive(Clone, Debug)]
pub(crate) struct TestServer {
    healthy: Arc<AtomicBool>,
}

impl TestServer {
    pub(crate) fn new() -> Self {
        Self {
            healthy: Arc::new(AtomicBool::new(true)),
        }
    }

    pub(crate) fn set_healthy(&self, h: bool) {
        self.healthy.store(h, Ordering::Release);
    }
}

impl crate::Connector<TestServerAddress> for TestServer {
    type IO = DuplexStream;
    type Error = std::io::Error;

    fn connect(
        &self,
        addr: TestServerAddress,
    ) -> impl Future<Output = std::io::Result<DuplexStream>> + Send + Sync + 'static {
        match addr {
            TestServerAddress::Working => {
                let (s1, s2) = tokio::io::duplex(1000);
                let healthy = Arc::clone(&self.healthy);
                tokio::task::spawn(
                    hyper::server::conn::http2::Builder::new(TokioExecutor::new())
                        .serve_connection(
                            TokioIo::new(s2),
                            service_fn(move |r| {
                                let healthy = Arc::clone(&healthy);
                                async move {
                                    let mut resp = hyper::Response::new(String::new());
                                    *resp.status_mut() = match r.uri().path() {
                                        "/healthz" => {
                                            if healthy.load(Ordering::Acquire) {
                                                StatusCode::OK
                                            } else {
                                                StatusCode::INTERNAL_SERVER_ERROR
                                            }
                                        }
                                        "/success" => StatusCode::OK,
                                        _ => StatusCode::NOT_FOUND,
                                    };
                                    Ok::<_, std::convert::Infallible>(resp)
                                }
                            }),
                        ),
                );
                Either::Left(std::future::ready(Ok(s1)))
            }
            TestServerAddress::FailToConnect => Either::Left(std::future::ready(Err(
                std::io::Error::new(std::io::ErrorKind::ConnectionRefused, String::new()),
            ))),
            TestServerAddress::HangsOnConnect => Either::Right(std::future::pending()),
            TestServerAddress::BrokenStream => {
                let (s1, _) = tokio::io::duplex(1000);
                Either::Left(std::future::ready(Ok(s1)))
            }
        }
    }
}