1pub mod acme;
8mod forward;
9mod handler;
10pub mod rate_limit;
11mod routing;
12pub mod sni;
13pub mod tls;
14mod websocket;
15
16pub use orca_core::config::FallbackConfig;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::AtomicUsize;
21
22use hyper::Request;
23use hyper::body::Incoming;
24use hyper::server::conn::http1;
25use hyper::service::service_fn;
26use hyper_util::rt::TokioIo;
27use tokio::net::TcpListener;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info, warn};
30
31use acme::AcmeManager;
32use handler::{handle_acme_challenge, handle_request};
33use rate_limit::RateLimiter;
34
35#[derive(Debug, Clone)]
37pub struct RouteTarget {
38 pub address: String,
40 pub service_name: String,
42 pub path_pattern: Option<String>,
46 pub weight: u32,
49 pub strip_prefix: Option<String>,
54}
55
56#[derive(Debug, Clone)]
58pub struct WasmTrigger {
59 pub pattern: String,
61 pub runtime_id: String,
63 pub service_name: String,
65}
66
67pub type WasmInvoker =
70 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
71
72pub type WasmInvokeFuture =
74 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
75
76pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
78
79pub async fn run_proxy(
81 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
82 wasm_triggers: SharedWasmTriggers,
83 wasm_invoker: Option<WasmInvoker>,
84 port: u16,
85 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
86 acme_manager: Option<AcmeManager>,
87) -> anyhow::Result<()> {
88 let addr = format!("0.0.0.0:{port}");
89 let listener = TcpListener::bind(&addr).await?;
90 let proto = if tls_acceptor.is_some() {
91 "HTTPS"
92 } else {
93 "HTTP"
94 };
95 info!("Reverse proxy listening on {addr} ({proto})");
96
97 serve_loop(
98 listener,
99 route_table,
100 wasm_triggers,
101 wasm_invoker,
102 tls_acceptor,
103 acme_manager,
104 )
105 .await
106}
107
108#[allow(clippy::too_many_arguments)]
110pub async fn run_proxy_with_fallback(
111 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
112 wasm_triggers: SharedWasmTriggers,
113 wasm_invoker: Option<WasmInvoker>,
114 port: u16,
115 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
116 acme_manager: Option<AcmeManager>,
117 fallback: Option<FallbackConfig>,
118) -> anyhow::Result<()> {
119 let addr = format!("0.0.0.0:{port}");
120 let listener = TcpListener::bind(&addr).await?;
121 let proto = if tls_acceptor.is_some() {
122 "HTTPS"
123 } else {
124 "HTTP"
125 };
126 info!("Reverse proxy listening on {addr} ({proto})");
127
128 serve_loop_with_fallback(
129 listener,
130 route_table,
131 wasm_triggers,
132 wasm_invoker,
133 tls_acceptor,
134 acme_manager,
135 fallback,
136 )
137 .await
138}
139
140pub type SharedCertResolver = Arc<acme::DynCertResolver>;
142
143pub async fn run_proxy_with_acme(
149 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
150 wasm_triggers: SharedWasmTriggers,
151 wasm_invoker: Option<WasmInvoker>,
152 acme_manager: AcmeManager,
153 domains: Vec<String>,
154) -> anyhow::Result<SharedCertResolver> {
155 run_proxy_with_acme_and_fallback(
156 route_table,
157 wasm_triggers,
158 wasm_invoker,
159 acme_manager,
160 domains,
161 None,
162 )
163 .await
164}
165
166#[allow(clippy::too_many_arguments)]
168pub async fn run_proxy_with_acme_and_fallback(
169 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
170 wasm_triggers: SharedWasmTriggers,
171 wasm_invoker: Option<WasmInvoker>,
172 acme_manager: AcmeManager,
173 domains: Vec<String>,
174 fallback: Option<FallbackConfig>,
175) -> anyhow::Result<SharedCertResolver> {
176 let resolver = Arc::new(acme::DynCertResolver::new());
177
178 let acme_mgr = acme_manager.clone();
179 let routes_clone = route_table.clone();
180 let triggers_clone = wasm_triggers.clone();
181 let invoker_clone = wasm_invoker.clone();
182 let fallback_http = fallback.clone();
183 let fallback_tls = fallback.clone();
184
185 let http_handle = tokio::spawn({
187 let acme = acme_mgr.clone();
188 let routes = routes_clone.clone();
189 let triggers = triggers_clone.clone();
190 let invoker = invoker_clone.clone();
191 async move {
192 if let Err(e) = run_proxy_with_fallback(
193 routes,
194 triggers,
195 invoker,
196 80,
197 None,
198 Some(acme),
199 fallback_http,
200 )
201 .await
202 {
203 error!("HTTP listener failed: {e}");
204 }
205 }
206 });
207
208 let resolver_clone = resolver.clone();
210 let https_handle = tokio::spawn(async move {
211 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
212
213 for domain in &domains {
215 if let Err(e) = acme_mgr
216 .ensure_cert_for_resolver(domain, &resolver_clone)
217 .await
218 {
219 error!(domain = %domain, error = %e, "Failed to provision cert");
220 }
221 }
222
223 let config = rustls::ServerConfig::builder()
225 .with_no_client_auth()
226 .with_cert_resolver(resolver_clone);
227
228 let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
229 info!(
230 "Starting HTTPS with SNI resolver ({} domains)",
231 domains.len()
232 );
233
234 let routes = routes_clone;
235 let triggers = triggers_clone;
236 let invoker = invoker_clone;
237 if let Err(e) = run_proxy_with_fallback(
238 routes,
239 triggers,
240 invoker,
241 443,
242 Some(acceptor),
243 Some(acme_mgr),
244 fallback_tls,
245 )
246 .await
247 {
248 error!("HTTPS listener failed: {e}");
249 }
250 });
251
252 tokio::spawn(async move {
255 tokio::select! {
256 _ = http_handle => warn!("HTTP listener exited"),
257 _ = https_handle => warn!("HTTPS listener exited"),
258 }
259 });
260
261 Ok(resolver)
262}
263
264async fn serve_loop(
266 listener: TcpListener,
267 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
268 wasm_triggers: SharedWasmTriggers,
269 wasm_invoker: Option<WasmInvoker>,
270 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
271 acme_manager: Option<AcmeManager>,
272) -> anyhow::Result<()> {
273 serve_loop_with_fallback(
274 listener,
275 route_table,
276 wasm_triggers,
277 wasm_invoker,
278 tls_acceptor,
279 acme_manager,
280 None,
281 )
282 .await
283}
284
285#[allow(clippy::too_many_arguments)]
287pub(crate) async fn serve_loop_with_fallback(
288 listener: TcpListener,
289 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
290 wasm_triggers: SharedWasmTriggers,
291 wasm_invoker: Option<WasmInvoker>,
292 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
293 acme_manager: Option<AcmeManager>,
294 fallback: Option<FallbackConfig>,
295) -> anyhow::Result<()> {
296 let counter = Arc::new(AtomicUsize::new(0));
297 let client = Arc::new(
298 reqwest::Client::builder()
299 .no_proxy()
300 .redirect(reqwest::redirect::Policy::none())
301 .build()
302 .expect("failed to build HTTP client"),
303 );
304 let acme = acme_manager.map(Arc::new);
305 let is_tls = tls_acceptor.is_some();
306 let rate_limiter = RateLimiter::new();
307
308 let fallback = Arc::new(fallback);
309 loop {
310 let (stream, peer) = match listener.accept().await {
311 Ok(conn) => conn,
312 Err(e) => {
313 warn!("Proxy accept error: {e}");
314 continue;
315 }
316 };
317
318 let routes = route_table.clone();
319 let triggers = wasm_triggers.clone();
320 let invoker = wasm_invoker.clone();
321 let counter = counter.clone();
322 let client = client.clone();
323 let acme = acme.clone();
324 let tls = tls_acceptor.clone();
325 let rl = rate_limiter.clone();
326 let fb = fallback.clone();
327 let routes_for_sni = routes.clone();
328
329 tokio::spawn(async move {
330 let service = service_fn(move |req: Request<Incoming>| {
331 let routes = routes.clone();
332 let triggers = triggers.clone();
333 let invoker = invoker.clone();
334 let counter = counter.clone();
335 let client = client.clone();
336 let acme = acme.clone();
337 let rl = rl.clone();
338 async move {
339 if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
340 return Ok(resp);
341 }
342 handle_request(
343 req,
344 &routes,
345 &triggers,
346 invoker.as_ref(),
347 &counter,
348 &client,
349 is_tls,
350 &rl,
351 peer,
352 )
353 .await
354 }
355 });
356 if let Some(acceptor) = tls {
357 let mut stream = stream;
358 let sni = sni::peek_sni(&mut stream).await;
360 let should_passthrough = if let Some(ref host) = sni {
361 let routes_lock = routes_for_sni.read().await;
362 let known = routes_lock.contains_key(host);
363 drop(routes_lock);
364 !known && fb.as_ref().as_ref().and_then(|f| f.tls.as_ref()).is_some()
365 } else {
366 false
367 };
368
369 if should_passthrough {
370 let target = fb
371 .as_ref()
372 .as_ref()
373 .and_then(|f| f.tls.clone())
374 .expect("checked above");
375 debug!(?sni, %target, "SNI passthrough");
376 match tokio::net::TcpStream::connect(&target).await {
377 Ok(mut backend) => {
378 if let Err(e) =
379 tokio::io::copy_bidirectional(&mut stream, &mut backend).await
380 {
381 debug!("Passthrough copy error from {peer}: {e}");
382 }
383 }
384 Err(e) => warn!("Failed to connect to TLS fallback {target}: {e}"),
385 }
386 return;
387 }
388
389 match acceptor.accept(stream).await {
390 Ok(tls_stream) => {
391 let io = TokioIo::new(tls_stream);
392 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
393 debug!("TLS proxy error from {peer}: {e}");
394 }
395 }
396 Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
397 }
398 } else {
399 let io = TokioIo::new(stream);
400 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
401 debug!("Proxy connection error from {peer}: {e}");
402 }
403 }
404 });
405 }
406}