1pub mod acme;
8mod handler;
9mod routing;
10pub mod tls;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::AtomicUsize;
15
16use hyper::Request;
17use hyper::body::Incoming;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper_util::rt::TokioIo;
21use tokio::net::TcpListener;
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24
25use acme::AcmeManager;
26use handler::{handle_acme_challenge, handle_request};
27
28#[derive(Debug, Clone)]
30pub struct RouteTarget {
31 pub address: String,
33 pub service_name: String,
35 pub path_pattern: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct WasmTrigger {
44 pub pattern: String,
46 pub runtime_id: String,
48 pub service_name: String,
50}
51
52pub type WasmInvoker =
55 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
56
57pub type WasmInvokeFuture =
59 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
60
61pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
63
64pub async fn run_proxy(
69 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
70 wasm_triggers: SharedWasmTriggers,
71 wasm_invoker: Option<WasmInvoker>,
72 port: u16,
73 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
74 acme_manager: Option<AcmeManager>,
75) -> anyhow::Result<()> {
76 let addr = format!("0.0.0.0:{port}");
77 let listener = TcpListener::bind(&addr).await?;
78 let proto = if tls_acceptor.is_some() {
79 "HTTPS"
80 } else {
81 "HTTP"
82 };
83 info!("Reverse proxy listening on {addr} ({proto})");
84
85 serve_loop(
86 listener,
87 route_table,
88 wasm_triggers,
89 wasm_invoker,
90 tls_acceptor,
91 acme_manager,
92 )
93 .await
94}
95
96pub async fn run_proxy_with_acme(
100 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
101 wasm_triggers: SharedWasmTriggers,
102 wasm_invoker: Option<WasmInvoker>,
103 acme_manager: AcmeManager,
104 domains: Vec<String>,
105) -> anyhow::Result<()> {
106 let provider = acme_manager.provider();
107
108 let acme_mgr = acme_manager.clone();
111 let routes_clone = route_table.clone();
112 let triggers_clone = wasm_triggers.clone();
113 let invoker_clone = wasm_invoker.clone();
114
115 let http_handle = tokio::spawn({
117 let acme = acme_mgr.clone();
118 let routes = routes_clone.clone();
119 let triggers = triggers_clone.clone();
120 let invoker = invoker_clone.clone();
121 async move {
122 if let Err(e) = run_proxy(routes, triggers, invoker, 80, None, Some(acme)).await {
123 error!("HTTP listener failed: {e}");
124 }
125 }
126 });
127
128 let https_handle = tokio::spawn(async move {
130 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
132
133 for domain in &domains {
134 info!(domain = %domain, "Auto-provisioning TLS certificate");
135 match provider.ensure_cert(domain).await {
136 Ok(acceptor) => {
137 info!(domain = %domain, "TLS cert ready, starting HTTPS");
138 let routes = routes_clone.clone();
139 let triggers = triggers_clone.clone();
140 let invoker = invoker_clone.clone();
141 let acme = acme_mgr.clone();
142 if let Err(e) =
144 run_proxy(routes, triggers, invoker, 443, Some(acceptor), Some(acme)).await
145 {
146 error!("HTTPS listener failed: {e}");
147 }
148 return;
149 }
150 Err(e) => {
151 error!(domain = %domain, error = %e, "Failed to provision cert");
152 }
153 }
154 }
155 error!("No TLS certs could be provisioned — HTTPS not started");
156 });
157
158 tokio::select! {
159 _ = http_handle => warn!("HTTP listener exited"),
160 _ = https_handle => warn!("HTTPS listener exited"),
161 }
162 Ok(())
163}
164
165async fn serve_loop(
167 listener: TcpListener,
168 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
169 wasm_triggers: SharedWasmTriggers,
170 wasm_invoker: Option<WasmInvoker>,
171 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
172 acme_manager: Option<AcmeManager>,
173) -> anyhow::Result<()> {
174 let counter = Arc::new(AtomicUsize::new(0));
175 let client = Arc::new(
176 reqwest::Client::builder()
177 .no_proxy()
178 .build()
179 .expect("failed to build HTTP client"),
180 );
181 let acme = acme_manager.map(Arc::new);
182
183 loop {
184 let (stream, peer) = match listener.accept().await {
185 Ok(conn) => conn,
186 Err(e) => {
187 warn!("Proxy accept error: {e}");
188 continue;
189 }
190 };
191
192 let routes = route_table.clone();
193 let triggers = wasm_triggers.clone();
194 let invoker = wasm_invoker.clone();
195 let counter = counter.clone();
196 let client = client.clone();
197 let acme = acme.clone();
198 let tls = tls_acceptor.clone();
199
200 tokio::spawn(async move {
201 let service = service_fn(move |req: Request<Incoming>| {
202 let routes = routes.clone();
203 let triggers = triggers.clone();
204 let invoker = invoker.clone();
205 let counter = counter.clone();
206 let client = client.clone();
207 let acme = acme.clone();
208 async move {
209 if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
210 return Ok(resp);
211 }
212 handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
213 .await
214 }
215 });
216
217 if let Some(acceptor) = tls {
218 match acceptor.accept(stream).await {
219 Ok(tls_stream) => {
220 let io = TokioIo::new(tls_stream);
221 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
222 debug!("TLS proxy error from {peer}: {e}");
223 }
224 }
225 Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
226 }
227 } else {
228 let io = TokioIo::new(stream);
229 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
230 debug!("Proxy connection error from {peer}: {e}");
231 }
232 }
233 });
234 }
235}