1pub mod acme;
8mod forward;
9mod handler;
10pub mod rate_limit;
11mod routing;
12pub mod tls;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::sync::atomic::AtomicUsize;
17
18use hyper::Request;
19use hyper::body::Incoming;
20use hyper::server::conn::http1;
21use hyper::service::service_fn;
22use hyper_util::rt::TokioIo;
23use tokio::net::TcpListener;
24use tokio::sync::RwLock;
25use tracing::{debug, error, info, warn};
26
27use acme::AcmeManager;
28use handler::{handle_acme_challenge, handle_request};
29use rate_limit::RateLimiter;
30
31#[derive(Debug, Clone)]
33pub struct RouteTarget {
34 pub address: String,
36 pub service_name: String,
38 pub path_pattern: Option<String>,
42}
43
44#[derive(Debug, Clone)]
46pub struct WasmTrigger {
47 pub pattern: String,
49 pub runtime_id: String,
51 pub service_name: String,
53}
54
55pub type WasmInvoker =
58 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
59
60pub type WasmInvokeFuture =
62 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
63
64pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
66
67pub async fn run_proxy(
72 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
73 wasm_triggers: SharedWasmTriggers,
74 wasm_invoker: Option<WasmInvoker>,
75 port: u16,
76 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
77 acme_manager: Option<AcmeManager>,
78) -> anyhow::Result<()> {
79 let addr = format!("0.0.0.0:{port}");
80 let listener = TcpListener::bind(&addr).await?;
81 let proto = if tls_acceptor.is_some() {
82 "HTTPS"
83 } else {
84 "HTTP"
85 };
86 info!("Reverse proxy listening on {addr} ({proto})");
87
88 serve_loop(
89 listener,
90 route_table,
91 wasm_triggers,
92 wasm_invoker,
93 tls_acceptor,
94 acme_manager,
95 )
96 .await
97}
98
99pub type SharedCertResolver = Arc<acme::DynCertResolver>;
101
102pub async fn run_proxy_with_acme(
108 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
109 wasm_triggers: SharedWasmTriggers,
110 wasm_invoker: Option<WasmInvoker>,
111 acme_manager: AcmeManager,
112 domains: Vec<String>,
113) -> anyhow::Result<SharedCertResolver> {
114 let resolver = Arc::new(acme::DynCertResolver::new());
115
116 let acme_mgr = acme_manager.clone();
117 let routes_clone = route_table.clone();
118 let triggers_clone = wasm_triggers.clone();
119 let invoker_clone = wasm_invoker.clone();
120
121 let http_handle = tokio::spawn({
123 let acme = acme_mgr.clone();
124 let routes = routes_clone.clone();
125 let triggers = triggers_clone.clone();
126 let invoker = invoker_clone.clone();
127 async move {
128 if let Err(e) = run_proxy(routes, triggers, invoker, 80, None, Some(acme)).await {
129 error!("HTTP listener failed: {e}");
130 }
131 }
132 });
133
134 let resolver_clone = resolver.clone();
136 let https_handle = tokio::spawn(async move {
137 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
138
139 for domain in &domains {
141 if let Err(e) = acme_mgr
142 .ensure_cert_for_resolver(domain, &resolver_clone)
143 .await
144 {
145 error!(domain = %domain, error = %e, "Failed to provision cert");
146 }
147 }
148
149 let config = rustls::ServerConfig::builder()
151 .with_no_client_auth()
152 .with_cert_resolver(resolver_clone);
153
154 let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
155 info!(
156 "Starting HTTPS with SNI resolver ({} domains)",
157 domains.len()
158 );
159
160 let routes = routes_clone;
161 let triggers = triggers_clone;
162 let invoker = invoker_clone;
163 if let Err(e) = run_proxy(
164 routes,
165 triggers,
166 invoker,
167 443,
168 Some(acceptor),
169 Some(acme_mgr),
170 )
171 .await
172 {
173 error!("HTTPS listener failed: {e}");
174 }
175 });
176
177 tokio::spawn(async move {
180 tokio::select! {
181 _ = http_handle => warn!("HTTP listener exited"),
182 _ = https_handle => warn!("HTTPS listener exited"),
183 }
184 });
185
186 Ok(resolver)
187}
188
189async fn serve_loop(
191 listener: TcpListener,
192 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
193 wasm_triggers: SharedWasmTriggers,
194 wasm_invoker: Option<WasmInvoker>,
195 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
196 acme_manager: Option<AcmeManager>,
197) -> anyhow::Result<()> {
198 let counter = Arc::new(AtomicUsize::new(0));
199 let client = Arc::new(
200 reqwest::Client::builder()
201 .no_proxy()
202 .build()
203 .expect("failed to build HTTP client"),
204 );
205 let acme = acme_manager.map(Arc::new);
206 let is_tls = tls_acceptor.is_some();
207 let rate_limiter = RateLimiter::new();
208
209 loop {
210 let (stream, peer) = match listener.accept().await {
211 Ok(conn) => conn,
212 Err(e) => {
213 warn!("Proxy accept error: {e}");
214 continue;
215 }
216 };
217
218 let routes = route_table.clone();
219 let triggers = wasm_triggers.clone();
220 let invoker = wasm_invoker.clone();
221 let counter = counter.clone();
222 let client = client.clone();
223 let acme = acme.clone();
224 let tls = tls_acceptor.clone();
225 let rl = rate_limiter.clone();
226
227 tokio::spawn(async move {
228 let service = service_fn(move |req: Request<Incoming>| {
229 let routes = routes.clone();
230 let triggers = triggers.clone();
231 let invoker = invoker.clone();
232 let counter = counter.clone();
233 let client = client.clone();
234 let acme = acme.clone();
235 let rl = rl.clone();
236 async move {
237 if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
238 return Ok(resp);
239 }
240 handle_request(
241 req,
242 &routes,
243 &triggers,
244 invoker.as_ref(),
245 &counter,
246 &client,
247 is_tls,
248 &rl,
249 peer,
250 )
251 .await
252 }
253 });
254
255 if let Some(acceptor) = tls {
256 match acceptor.accept(stream).await {
257 Ok(tls_stream) => {
258 let io = TokioIo::new(tls_stream);
259 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
260 debug!("TLS proxy error from {peer}: {e}");
261 }
262 }
263 Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
264 }
265 } else {
266 let io = TokioIo::new(stream);
267 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
268 debug!("Proxy connection error from {peer}: {e}");
269 }
270 }
271 });
272 }
273}