pub mod acme;
mod handler;
mod routing;
pub mod tls;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use hyper::Request;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use acme::AcmeManager;
use handler::{handle_acme_challenge, handle_request};
#[derive(Debug, Clone)]
pub struct RouteTarget {
pub address: String,
pub service_name: String,
}
#[derive(Debug, Clone)]
pub struct WasmTrigger {
pub pattern: String,
pub runtime_id: String,
pub service_name: String,
}
pub type WasmInvoker =
Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
pub type WasmInvokeFuture =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
pub async fn run_proxy(
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
port: u16,
tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
acme_manager: Option<AcmeManager>,
) -> anyhow::Result<()> {
let addr = format!("0.0.0.0:{port}");
let listener = TcpListener::bind(&addr).await?;
let proto = if tls_acceptor.is_some() {
"HTTPS"
} else {
"HTTP"
};
info!("Reverse proxy listening on {addr} ({proto})");
let counter = Arc::new(AtomicUsize::new(0));
let client = Arc::new(reqwest::Client::new());
let acme = acme_manager.map(Arc::new);
loop {
let (stream, peer) = match listener.accept().await {
Ok(conn) => conn,
Err(e) => {
warn!("Proxy accept error: {e}");
continue;
}
};
let routes = route_table.clone();
let triggers = wasm_triggers.clone();
let invoker = wasm_invoker.clone();
let counter = counter.clone();
let client = client.clone();
let acme = acme.clone();
let tls = tls_acceptor.clone();
tokio::spawn(async move {
let service = service_fn(move |req: Request<Incoming>| {
let routes = routes.clone();
let triggers = triggers.clone();
let invoker = invoker.clone();
let counter = counter.clone();
let client = client.clone();
let acme = acme.clone();
async move {
if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
return Ok(resp);
}
handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
.await
}
});
if let Some(acceptor) = tls {
match acceptor.accept(stream).await {
Ok(tls_stream) => {
let io = TokioIo::new(tls_stream);
if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
debug!("TLS proxy error from {peer}: {e}");
}
}
Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
}
} else {
let io = TokioIo::new(stream);
if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
debug!("Proxy connection error from {peer}: {e}");
}
}
});
}
}