1pub mod acme;
8mod handler;
9mod routing;
10pub mod tls;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::AtomicUsize;
15
16use hyper::Request;
17use hyper::body::Incoming;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper_util::rt::TokioIo;
21use tokio::net::TcpListener;
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24
25use acme::AcmeManager;
26use handler::{handle_acme_challenge, handle_request};
27
28#[derive(Debug, Clone)]
30pub struct RouteTarget {
31 pub address: String,
33 pub service_name: String,
35 pub path_pattern: Option<String>,
39}
40
41#[derive(Debug, Clone)]
43pub struct WasmTrigger {
44 pub pattern: String,
46 pub runtime_id: String,
48 pub service_name: String,
50}
51
52pub type WasmInvoker =
55 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
56
57pub type WasmInvokeFuture =
59 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
60
61pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
63
64pub async fn run_proxy(
69 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
70 wasm_triggers: SharedWasmTriggers,
71 wasm_invoker: Option<WasmInvoker>,
72 port: u16,
73 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
74 acme_manager: Option<AcmeManager>,
75) -> anyhow::Result<()> {
76 let addr = format!("0.0.0.0:{port}");
77 let listener = TcpListener::bind(&addr).await?;
78 let proto = if tls_acceptor.is_some() {
79 "HTTPS"
80 } else {
81 "HTTP"
82 };
83 info!("Reverse proxy listening on {addr} ({proto})");
84
85 serve_loop(
86 listener,
87 route_table,
88 wasm_triggers,
89 wasm_invoker,
90 tls_acceptor,
91 acme_manager,
92 )
93 .await
94}
95
96pub type SharedCertResolver = Arc<acme::DynCertResolver>;
98
99pub async fn run_proxy_with_acme(
105 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
106 wasm_triggers: SharedWasmTriggers,
107 wasm_invoker: Option<WasmInvoker>,
108 acme_manager: AcmeManager,
109 domains: Vec<String>,
110) -> anyhow::Result<SharedCertResolver> {
111 let resolver = Arc::new(acme::DynCertResolver::new());
112
113 let acme_mgr = acme_manager.clone();
114 let routes_clone = route_table.clone();
115 let triggers_clone = wasm_triggers.clone();
116 let invoker_clone = wasm_invoker.clone();
117
118 let http_handle = tokio::spawn({
120 let acme = acme_mgr.clone();
121 let routes = routes_clone.clone();
122 let triggers = triggers_clone.clone();
123 let invoker = invoker_clone.clone();
124 async move {
125 if let Err(e) = run_proxy(routes, triggers, invoker, 80, None, Some(acme)).await {
126 error!("HTTP listener failed: {e}");
127 }
128 }
129 });
130
131 let resolver_clone = resolver.clone();
133 let https_handle = tokio::spawn(async move {
134 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
135
136 for domain in &domains {
138 if let Err(e) = acme_mgr
139 .ensure_cert_for_resolver(domain, &resolver_clone)
140 .await
141 {
142 error!(domain = %domain, error = %e, "Failed to provision cert");
143 }
144 }
145
146 let config = rustls::ServerConfig::builder()
148 .with_no_client_auth()
149 .with_cert_resolver(resolver_clone);
150
151 let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
152 info!(
153 "Starting HTTPS with SNI resolver ({} domains)",
154 domains.len()
155 );
156
157 let routes = routes_clone;
158 let triggers = triggers_clone;
159 let invoker = invoker_clone;
160 if let Err(e) = run_proxy(
161 routes,
162 triggers,
163 invoker,
164 443,
165 Some(acceptor),
166 Some(acme_mgr),
167 )
168 .await
169 {
170 error!("HTTPS listener failed: {e}");
171 }
172 });
173
174 tokio::spawn(async move {
177 tokio::select! {
178 _ = http_handle => warn!("HTTP listener exited"),
179 _ = https_handle => warn!("HTTPS listener exited"),
180 }
181 });
182
183 Ok(resolver)
184}
185
186async fn serve_loop(
188 listener: TcpListener,
189 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
190 wasm_triggers: SharedWasmTriggers,
191 wasm_invoker: Option<WasmInvoker>,
192 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
193 acme_manager: Option<AcmeManager>,
194) -> anyhow::Result<()> {
195 let counter = Arc::new(AtomicUsize::new(0));
196 let client = Arc::new(
197 reqwest::Client::builder()
198 .no_proxy()
199 .build()
200 .expect("failed to build HTTP client"),
201 );
202 let acme = acme_manager.map(Arc::new);
203
204 loop {
205 let (stream, peer) = match listener.accept().await {
206 Ok(conn) => conn,
207 Err(e) => {
208 warn!("Proxy accept error: {e}");
209 continue;
210 }
211 };
212
213 let routes = route_table.clone();
214 let triggers = wasm_triggers.clone();
215 let invoker = wasm_invoker.clone();
216 let counter = counter.clone();
217 let client = client.clone();
218 let acme = acme.clone();
219 let tls = tls_acceptor.clone();
220
221 tokio::spawn(async move {
222 let service = service_fn(move |req: Request<Incoming>| {
223 let routes = routes.clone();
224 let triggers = triggers.clone();
225 let invoker = invoker.clone();
226 let counter = counter.clone();
227 let client = client.clone();
228 let acme = acme.clone();
229 async move {
230 if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
231 return Ok(resp);
232 }
233 handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
234 .await
235 }
236 });
237
238 if let Some(acceptor) = tls {
239 match acceptor.accept(stream).await {
240 Ok(tls_stream) => {
241 let io = TokioIo::new(tls_stream);
242 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
243 debug!("TLS proxy error from {peer}: {e}");
244 }
245 }
246 Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
247 }
248 } else {
249 let io = TokioIo::new(stream);
250 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
251 debug!("Proxy connection error from {peer}: {e}");
252 }
253 }
254 });
255 }
256}