proxyfor 0.5.0

A powerful and flexible proxy CLI for capturing and inspecting HTTP(S) and WS(S) traffic
Documentation
#![allow(unused)]

use anyhow::{anyhow, Result};
use async_compression::tokio::bufread::GzipEncoder;
use bytes::{Bytes, BytesMut};
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use http::{
    header::{CONTENT_ENCODING, CONTENT_TYPE},
    Method, StatusCode,
};
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
use hyper::{
    body::{Frame, Incoming},
    service::service_fn,
    Request, Response,
};
use hyper_util::{
    rt::{TokioExecutor, TokioIo},
    server::conn::auto,
};
use proxyfor::{
    cert::CertificateAuthority,
    server::{PrintMode, Server, ServerBuilder},
};
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tokio::time::timeout;
use tokio::{net::TcpListener, sync::oneshot};
use tokio_graceful::Shutdown;
use tokio_tungstenite::tungstenite::Message;
use tokio_util::io::ReaderStream;

pub const HELLO_WORLD: &str = "Hello, World!";
pub const WORLD: &str = "world";

async fn test_server(req: Request<Incoming>) -> Result<Response<BoxBody<Bytes, anyhow::Error>>> {
    if hyper_tungstenite::is_upgrade_request(&req) {
        let (res, ws) = hyper_tungstenite::upgrade(req, None)?;

        tokio::spawn(async move {
            let mut ws = ws.await.unwrap();

            while let Some(msg) = ws.next().await {
                let msg = msg.unwrap();
                if msg.is_close() {
                    break;
                }
                ws.send(Message::Text(WORLD.into())).await.unwrap();
            }
        });

        let (parts, body) = res.into_parts();
        let bytes = body.collect().await?.to_bytes();
        let body = Full::new(bytes).map_err(|err| anyhow!("{err}")).boxed();

        return Ok(Response::from_parts(parts, body));
    }

    match (req.method(), req.uri().path()) {
        (&Method::GET, "/hello") => Ok(Response::new(
            Full::new(Bytes::from(HELLO_WORLD))
                .map_err(|err| anyhow!("{err}"))
                .boxed(),
        )),
        (&Method::GET, "/hello/gzip") => {
            let stream_body = StreamBody::new(
                ReaderStream::new(GzipEncoder::new(HELLO_WORLD.as_bytes()))
                    .map_ok(Frame::data)
                    .map_err(|err| anyhow!("{err}")),
            );
            let res = Response::builder()
                .header(CONTENT_ENCODING, "gzip")
                .status(StatusCode::OK)
                .body(BoxBody::new(stream_body))?;
            Ok(res)
        }
        (&Method::POST, "/echo") => {
            let content_type = req.headers().get(CONTENT_TYPE).cloned();
            let bytes = req.collect().await?.to_bytes();
            let body = Full::new(bytes).map_err(|err| anyhow!("{err}")).boxed();
            let mut res = Response::new(body);
            if let Some(content_type) = content_type {
                res.headers_mut().insert(CONTENT_TYPE, content_type);
            }
            Ok(res)
        }
        _ => {
            let mut res = Response::default();
            *res.status_mut() = StatusCode::NOT_FOUND;
            Ok(res)
        }
    }
}

pub async fn start_http_server() -> Result<(SocketAddr, oneshot::Sender<()>)> {
    let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?;
    let addr = listener.local_addr()?;
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        let server = auto::Builder::new(TokioExecutor::new());
        let shutdown = Shutdown::new(async { rx.await.unwrap_or_default() });
        let guard = shutdown.guard_weak();

        loop {
            tokio::select! {
                res = listener.accept() => {
                    let Ok((tcp, _)) = res else {
                        continue;
                    };

                    let server = server.clone();

                    shutdown.spawn_task(async move {
                        let _ = server
                            .serve_connection_with_upgrades(TokioIo::new(tcp), service_fn(test_server))
                            .await;
                    });
                }
                _ = guard.cancelled() => {
                    break;
                }
            }
        }

        shutdown.shutdown().await;
    });

    Ok((addr, tx))
}

pub async fn start_proxy(web: bool) -> Result<(SocketAddr, oneshot::Sender<()>)> {
    let server = build_proxy_server(web)?;
    run_proxy_server(server).await
}

pub async fn run_proxy_server(server: Arc<Server>) -> Result<(SocketAddr, oneshot::Sender<()>)> {
    let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?;
    let proxy_addr = listener.local_addr()?;
    let stop = server.run(listener).await?;
    Ok((proxy_addr, stop))
}

pub fn build_proxy_server(web: bool) -> Result<Arc<Server>> {
    let ca = build_ca()?;
    let server = ServerBuilder::new(ca)
        .print_mode(PrintMode::Nothing)
        .web(web)
        .build();
    Ok(server)
}

pub fn build_proxy_client(proxy: &str) -> Result<reqwest::Client> {
    let proxy = reqwest::Proxy::all(proxy)?;
    let ca_cert_file = resolve_fixture_path("proxyfor-ca-cert.cer");
    let ca_cert_data = std::fs::read_to_string(ca_cert_file)?;
    let ca_cert = reqwest::tls::Certificate::from_pem(ca_cert_data.as_bytes())?;

    let client = reqwest::Client::builder()
        .proxy(proxy)
        .add_root_certificate(ca_cert)
        .no_brotli()
        .no_deflate()
        .no_gzip()
        .build()?;

    Ok(client)
}

pub fn build_client() -> Result<reqwest::Client> {
    let ca_cert_file = resolve_fixture_path("proxyfor-ca-cert.cer");
    let ca_cert_data = std::fs::read_to_string(ca_cert_file)?;
    let ca_cert = reqwest::tls::Certificate::from_pem(ca_cert_data.as_bytes())?;

    let client = reqwest::Client::builder()
        .add_root_certificate(ca_cert)
        .no_brotli()
        .no_deflate()
        .no_gzip()
        .build()?;

    Ok(client)
}

pub fn build_ca() -> Result<CertificateAuthority> {
    let ca_cert_file = resolve_fixture_path("proxyfor-ca-cert.cer");
    let private_key_file = resolve_fixture_path("proxyfor-key.pem");
    let ca = proxyfor::cert::init_ca(&ca_cert_file, &private_key_file)?;
    Ok(ca)
}

pub async fn fetch_subscribe(url: &str, timeout: Duration) -> Result<String> {
    let client = build_client()?;
    let res = client.get(url).send().await.unwrap();
    let mut chunks = BytesMut::new();
    let mut stream = res.bytes_stream();
    tokio::select! {
        _ = async {
                while let Some(chunk) = stream.next().await {
                    chunks.extend_from_slice(&chunk.unwrap());
                }
        } => {}
        _ = tokio::time::sleep(timeout) => {}
    }
    let output = std::str::from_utf8(&chunks).unwrap();
    Ok(output.to_string())
}

pub fn resolve_fixture_path(path: &str) -> PathBuf {
    let mut dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
    dir.push("tests");
    dir.push("fixtures");
    dir.push(path);
    dir
}

pub fn mask_text(text: &str) -> String {
    let re = fancy_regex::Regex::new(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z)|(\w{3}, \d{2} \w{3} \d{4} \d{2}:\d{2}:\d{2} GMT)").unwrap();
    let text = re.replace_all(text, "<DATETIME>");
    let re = fancy_regex::Regex::new(r#"localhost:\d+"#).unwrap();
    let text = re.replace_all(&text, "localhost:<PORT>");
    let re = fancy_regex::Regex::new(r#""gid": *\d+"#).unwrap();
    let text = re.replace_all(&text, r#""gid":<GID>"#);
    let re = fancy_regex::Regex::new(r#""time": *\d+"#).unwrap();
    let text = re.replace_all(&text, r#""time":<TIME>"#);
    let re = fancy_regex::Regex::new(r#""version": *"\S+""#).unwrap();
    let text = re.replace_all(&text, r#""version":"<VERSION>""#);
    let re = fancy_regex::Regex::new(r#"body_file": *"\S+""#).unwrap();
    let text = re.replace_all(&text, r#"body_file":"<BODY_FILE>""#);
    text.to_string()
}