pub mod acme;
mod body;
mod forward;
mod handler;
pub mod rate_limit;
mod routing;
pub mod sni;
pub mod tls;
mod websocket;
pub use orca_core::config::FallbackConfig;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use hyper::Request;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
use acme::AcmeManager;
use handler::{handle_acme_challenge, handle_request};
use rate_limit::RateLimiter;
#[derive(Debug, Clone)]
pub struct RouteTarget {
pub address: String,
pub service_name: String,
pub path_pattern: Option<String>,
pub weight: u32,
pub strip_prefix: Option<String>,
}
#[derive(Debug, Clone)]
pub struct WasmTrigger {
pub pattern: String,
pub runtime_id: String,
pub service_name: String,
}
pub type WasmInvoker =
Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
pub type WasmInvokeFuture =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
pub async fn run_proxy(
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
port: u16,
tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
acme_manager: Option<AcmeManager>,
) -> anyhow::Result<()> {
let addr = format!("0.0.0.0:{port}");
let listener = TcpListener::bind(&addr).await?;
let proto = if tls_acceptor.is_some() {
"HTTPS"
} else {
"HTTP"
};
info!("Reverse proxy listening on {addr} ({proto})");
serve_loop(
listener,
route_table,
wasm_triggers,
wasm_invoker,
tls_acceptor,
acme_manager,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn run_proxy_with_fallback(
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
port: u16,
tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
acme_manager: Option<AcmeManager>,
fallback: Option<FallbackConfig>,
) -> anyhow::Result<()> {
let addr = format!("0.0.0.0:{port}");
let listener = TcpListener::bind(&addr).await?;
let proto = if tls_acceptor.is_some() {
"HTTPS"
} else {
"HTTP"
};
info!("Reverse proxy listening on {addr} ({proto})");
serve_loop_with_fallback(
listener,
route_table,
wasm_triggers,
wasm_invoker,
tls_acceptor,
acme_manager,
fallback,
)
.await
}
pub type SharedCertResolver = Arc<acme::DynCertResolver>;
pub async fn run_proxy_with_acme(
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
acme_manager: AcmeManager,
domains: Vec<String>,
) -> anyhow::Result<SharedCertResolver> {
run_proxy_with_acme_and_fallback(
route_table,
wasm_triggers,
wasm_invoker,
acme_manager,
domains,
None,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn run_proxy_with_acme_and_fallback(
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
acme_manager: AcmeManager,
domains: Vec<String>,
fallback: Option<FallbackConfig>,
) -> anyhow::Result<SharedCertResolver> {
let resolver = Arc::new(acme::DynCertResolver::new());
let acme_mgr = acme_manager.clone();
let routes_clone = route_table.clone();
let triggers_clone = wasm_triggers.clone();
let invoker_clone = wasm_invoker.clone();
let fallback_http = fallback.clone();
let fallback_tls = fallback.clone();
let http_handle = tokio::spawn({
let acme = acme_mgr.clone();
let routes = routes_clone.clone();
let triggers = triggers_clone.clone();
let invoker = invoker_clone.clone();
async move {
if let Err(e) = run_proxy_with_fallback(
routes,
triggers,
invoker,
80,
None,
Some(acme),
fallback_http,
)
.await
{
error!("HTTP listener failed: {e}");
}
}
});
let resolver_clone = resolver.clone();
let https_handle = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
const PER_DOMAIN_PROVISION_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(60);
for domain in &domains {
let fut = acme_mgr.ensure_cert_for_resolver(domain, &resolver_clone);
match tokio::time::timeout(PER_DOMAIN_PROVISION_TIMEOUT, fut).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!(domain = %domain, error = %e, "Failed to provision cert");
}
Err(_) => {
warn!(
domain = %domain,
timeout_secs = PER_DOMAIN_PROVISION_TIMEOUT.as_secs(),
"Cert provisioning timed out — skipping (HTTPS will start without this cert; reconciler may retry on demand)"
);
}
}
}
let config = rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(resolver_clone);
let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
info!(
"Starting HTTPS with SNI resolver ({} domains)",
domains.len()
);
let routes = routes_clone;
let triggers = triggers_clone;
let invoker = invoker_clone;
if let Err(e) = run_proxy_with_fallback(
routes,
triggers,
invoker,
443,
Some(acceptor),
Some(acme_mgr),
fallback_tls,
)
.await
{
error!("HTTPS listener failed: {e}");
}
});
tokio::spawn(async move {
tokio::select! {
_ = http_handle => warn!("HTTP listener exited"),
_ = https_handle => warn!("HTTPS listener exited"),
}
});
Ok(resolver)
}
async fn serve_loop(
listener: TcpListener,
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
acme_manager: Option<AcmeManager>,
) -> anyhow::Result<()> {
serve_loop_with_fallback(
listener,
route_table,
wasm_triggers,
wasm_invoker,
tls_acceptor,
acme_manager,
None,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn serve_loop_with_fallback(
listener: TcpListener,
route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
wasm_triggers: SharedWasmTriggers,
wasm_invoker: Option<WasmInvoker>,
tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
acme_manager: Option<AcmeManager>,
fallback: Option<FallbackConfig>,
) -> anyhow::Result<()> {
let counter = Arc::new(AtomicUsize::new(0));
let client = Arc::new(
reqwest::Client::builder()
.no_proxy()
.redirect(reqwest::redirect::Policy::none())
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(300))
.pool_idle_timeout(std::time::Duration::from_secs(90))
.build()
.expect("failed to build HTTP client"),
);
let acme = acme_manager.map(Arc::new);
let is_tls = tls_acceptor.is_some();
let rate_limiter = RateLimiter::new();
let fallback = Arc::new(fallback);
loop {
let (stream, peer) = match listener.accept().await {
Ok(conn) => conn,
Err(e) => {
warn!("Proxy accept error: {e}");
continue;
}
};
let routes = route_table.clone();
let triggers = wasm_triggers.clone();
let invoker = wasm_invoker.clone();
let counter = counter.clone();
let client = client.clone();
let acme = acme.clone();
let tls = tls_acceptor.clone();
let rl = rate_limiter.clone();
let fb = fallback.clone();
let routes_for_sni = routes.clone();
let fb_for_service = fb.clone();
tokio::spawn(async move {
let service = service_fn(move |req: Request<Incoming>| {
let routes = routes.clone();
let triggers = triggers.clone();
let invoker = invoker.clone();
let counter = counter.clone();
let client = client.clone();
let acme = acme.clone();
let rl = rl.clone();
let fb = fb_for_service.clone();
async move {
if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
return Ok(resp);
}
handle_request(
req,
&routes,
&triggers,
invoker.as_ref(),
&counter,
&client,
is_tls,
&rl,
peer,
fb.as_ref().as_ref(),
)
.await
}
});
if let Some(acceptor) = tls {
let mut stream = stream;
let sni = sni::peek_sni(&mut stream).await;
let should_passthrough = if let Some(ref host) = sni {
let routes_lock = routes_for_sni.read().await;
let known = routes_lock.contains_key(host);
drop(routes_lock);
!known && fb.as_ref().as_ref().and_then(|f| f.tls.as_ref()).is_some()
} else {
false
};
if should_passthrough {
let target = fb
.as_ref()
.as_ref()
.and_then(|f| f.tls.clone())
.expect("checked above");
debug!(?sni, %target, "SNI passthrough");
match tokio::net::TcpStream::connect(&target).await {
Ok(mut backend) => {
if let Err(e) =
tokio::io::copy_bidirectional(&mut stream, &mut backend).await
{
debug!("Passthrough copy error from {peer}: {e}");
}
}
Err(e) => warn!("Failed to connect to TLS fallback {target}: {e}"),
}
return;
}
match acceptor.accept(stream).await {
Ok(tls_stream) => {
let io = TokioIo::new(tls_stream);
if let Err(e) = http1::Builder::new()
.serve_connection(io, service)
.with_upgrades()
.await
{
debug!("TLS proxy error from {peer}: {e}");
}
}
Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
}
} else {
let io = TokioIo::new(stream);
if let Err(e) = http1::Builder::new()
.serve_connection(io, service)
.with_upgrades()
.await
{
debug!("Proxy connection error from {peer}: {e}");
}
}
});
}
}