1pub mod acme;
7mod handler;
8mod routing;
9pub mod tls;
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::sync::atomic::AtomicUsize;
14
15use hyper::Request;
16use hyper::body::Incoming;
17use hyper::server::conn::http1;
18use hyper::service::service_fn;
19use hyper_util::rt::TokioIo;
20use tokio::net::TcpListener;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23
24use acme::AcmeManager;
25use handler::{handle_acme_challenge, handle_request};
26
27#[derive(Debug, Clone)]
29pub struct RouteTarget {
30 pub address: String,
32 pub service_name: String,
34}
35
36#[derive(Debug, Clone)]
38pub struct WasmTrigger {
39 pub pattern: String,
41 pub runtime_id: String,
43 pub service_name: String,
45}
46
47pub type WasmInvoker =
50 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
51
52pub type WasmInvokeFuture =
54 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
55
56pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
58
59pub async fn run_proxy(
67 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
68 wasm_triggers: SharedWasmTriggers,
69 wasm_invoker: Option<WasmInvoker>,
70 port: u16,
71 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
72 acme_manager: Option<AcmeManager>,
73) -> anyhow::Result<()> {
74 let addr = format!("0.0.0.0:{port}");
75 let listener = TcpListener::bind(&addr).await?;
76 let proto = if tls_acceptor.is_some() {
77 "HTTPS"
78 } else {
79 "HTTP"
80 };
81 info!("Reverse proxy listening on {addr} ({proto})");
82
83 let counter = Arc::new(AtomicUsize::new(0));
84 let client = Arc::new(reqwest::Client::new());
85 let acme = acme_manager.map(Arc::new);
86
87 loop {
88 let (stream, peer) = match listener.accept().await {
89 Ok(conn) => conn,
90 Err(e) => {
91 warn!("Proxy accept error: {e}");
92 continue;
93 }
94 };
95
96 let routes = route_table.clone();
97 let triggers = wasm_triggers.clone();
98 let invoker = wasm_invoker.clone();
99 let counter = counter.clone();
100 let client = client.clone();
101 let acme = acme.clone();
102
103 let tls = tls_acceptor.clone();
104 tokio::spawn(async move {
105 let service = service_fn(move |req: Request<Incoming>| {
106 let routes = routes.clone();
107 let triggers = triggers.clone();
108 let invoker = invoker.clone();
109 let counter = counter.clone();
110 let client = client.clone();
111 let acme = acme.clone();
112 async move {
113 if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
115 return Ok(resp);
116 }
117 handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
118 .await
119 }
120 });
121
122 if let Some(acceptor) = tls {
123 match acceptor.accept(stream).await {
124 Ok(tls_stream) => {
125 let io = TokioIo::new(tls_stream);
126 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
127 debug!("TLS proxy error from {peer}: {e}");
128 }
129 }
130 Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
131 }
132 } else {
133 let io = TokioIo::new(stream);
134 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
135 debug!("Proxy connection error from {peer}: {e}");
136 }
137 }
138 });
139 }
140}