chaussette 0.1.1

SOCKS5 to HTTP CONNECT Proxy
Documentation
use futures_util::{select, FutureExt};
use http::{Request, Response, Uri};
use hyper::body::Incoming;
use hyper_boring::v1::HttpsConnector;
use hyper_util::{
    client::legacy::connect::HttpConnector,
    rt::{TokioExecutor, TokioIo},
};
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tower::{
    retry::backoff::{Backoff, ExponentialBackoffMaker, MakeBackoff},
    util::rng::HasherRng,
    BoxError, Service,
};

#[derive(Clone)]
pub(crate) struct ProxyClient {
    tx: mpsc::Sender<(
        Request<http_body_util::Empty<&'static [u8]>>,
        oneshot::Sender<hyper::Result<Response<Incoming>>>,
    )>,
}

impl ProxyClient {
    pub(crate) fn new(mut connector: HttpsConnector<HttpConnector>, proxy: Uri) -> Self {
        let (tx, mut rx) = mpsc::channel(64);

        let client = Self { tx };

        tokio::spawn(async move {
            let mut request_sender = None;

            while let Some((req, mut tx)) = rx.recv().await {
                let sender = select! {
                    sender = get_proxy_request_sender(&mut connector, proxy.clone(), &mut request_sender).fuse() => sender,
                    _ = tx.closed().fuse() => {
                        tracing::info!("client request cancelled");

                        continue
                    },
                };

                tokio::spawn(sender.send_request(req).then(async |res| {
                    let _ = tx.send(res);
                }));
            }
        });

        client
    }

    pub(crate) async fn request(
        &self,
        req: Request<http_body_util::Empty<&'static [u8]>>,
    ) -> hyper::Result<Response<Incoming>> {
        let (tx, rx) = oneshot::channel();

        self.tx.send((req, tx)).await.unwrap();

        rx.await.unwrap()
    }
}

type ProxyRequestSender =
    hyper::client::conn::http2::SendRequest<http_body_util::Empty<&'static [u8]>>;

async fn get_proxy_request_sender<'c>(
    connector: &mut HttpsConnector<HttpConnector>,
    proxy: Uri,
    request_sender: &'c mut Option<ProxyRequestSender>,
) -> &'c mut ProxyRequestSender {
    match request_sender.take() {
        Some(mut sender) => match sender.ready().await {
            Ok(()) => return request_sender.insert(sender),
            Err(e) => {
                tracing::info!(error = ?e, "old proxy connection is closed, reconnecting");
            }
        },
        None => {
            tracing::info!(proxy = ?proxy, "establishing initial connection");
        }
    }

    let mut exponential_backoff = ExponentialBackoffMaker::new(
        Duration::from_millis(200),
        Duration::from_secs(5),
        0.1,
        HasherRng::default(),
    )
    .unwrap()
    .make_backoff();

    loop {
        tracing::debug!(proxy = ?proxy, "connecting to proxy");

        match connect(connector, proxy.clone()).await {
            Ok(sender) => return request_sender.insert(sender),
            Err(e) => tracing::error!(error = ?e, "failed to connect to proxy"),
        }

        exponential_backoff.next_backoff().await;
    }
}

async fn connect(
    connector: &mut HttpsConnector<HttpConnector>,
    proxy: Uri,
) -> Result<ProxyRequestSender, BoxError> {
    let stream = connector.call(proxy).await?;

    let (sender, conn) =
        hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(stream)).await?;

    tokio::spawn(async move {
        if let Err(e) = conn.await {
            tracing::error!(error = ?e, "proxy connection errored out");
        }
    });

    Ok(sender)
}