1pub mod acme;
8mod body;
9mod forward;
10mod handler;
11pub mod rate_limit;
12mod routing;
13pub mod sni;
14pub mod tls;
15mod websocket;
16
17pub use orca_core::config::FallbackConfig;
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::sync::atomic::AtomicUsize;
22
23use hyper::Request;
24use hyper::body::Incoming;
25use hyper::server::conn::http1;
26use hyper::service::service_fn;
27use hyper_util::rt::TokioIo;
28use tokio::net::TcpListener;
29use tokio::sync::RwLock;
30use tracing::{debug, error, info, warn};
31
32use acme::AcmeManager;
33use handler::{handle_acme_challenge, handle_request};
34use rate_limit::RateLimiter;
35
36#[derive(Debug, Clone)]
38pub struct RouteTarget {
39 pub address: String,
41 pub service_name: String,
43 pub path_pattern: Option<String>,
47 pub weight: u32,
50 pub strip_prefix: Option<String>,
55}
56
57#[derive(Debug, Clone)]
59pub struct WasmTrigger {
60 pub pattern: String,
62 pub runtime_id: String,
64 pub service_name: String,
66}
67
68pub type WasmInvoker =
71 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
72
73pub type WasmInvokeFuture =
75 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
76
77pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
79
80pub async fn run_proxy(
82 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
83 wasm_triggers: SharedWasmTriggers,
84 wasm_invoker: Option<WasmInvoker>,
85 port: u16,
86 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
87 acme_manager: Option<AcmeManager>,
88) -> anyhow::Result<()> {
89 let addr = format!("0.0.0.0:{port}");
90 let listener = TcpListener::bind(&addr).await?;
91 let proto = if tls_acceptor.is_some() {
92 "HTTPS"
93 } else {
94 "HTTP"
95 };
96 info!("Reverse proxy listening on {addr} ({proto})");
97
98 serve_loop(
99 listener,
100 route_table,
101 wasm_triggers,
102 wasm_invoker,
103 tls_acceptor,
104 acme_manager,
105 )
106 .await
107}
108
109#[allow(clippy::too_many_arguments)]
111pub async fn run_proxy_with_fallback(
112 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
113 wasm_triggers: SharedWasmTriggers,
114 wasm_invoker: Option<WasmInvoker>,
115 port: u16,
116 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
117 acme_manager: Option<AcmeManager>,
118 fallback: Option<FallbackConfig>,
119) -> anyhow::Result<()> {
120 let addr = format!("0.0.0.0:{port}");
121 let listener = TcpListener::bind(&addr).await?;
122 let proto = if tls_acceptor.is_some() {
123 "HTTPS"
124 } else {
125 "HTTP"
126 };
127 info!("Reverse proxy listening on {addr} ({proto})");
128
129 serve_loop_with_fallback(
130 listener,
131 route_table,
132 wasm_triggers,
133 wasm_invoker,
134 tls_acceptor,
135 acme_manager,
136 fallback,
137 )
138 .await
139}
140
141pub type SharedCertResolver = Arc<acme::DynCertResolver>;
143
144pub async fn run_proxy_with_acme(
150 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
151 wasm_triggers: SharedWasmTriggers,
152 wasm_invoker: Option<WasmInvoker>,
153 acme_manager: AcmeManager,
154 domains: Vec<String>,
155) -> anyhow::Result<SharedCertResolver> {
156 run_proxy_with_acme_and_fallback(
157 route_table,
158 wasm_triggers,
159 wasm_invoker,
160 acme_manager,
161 domains,
162 None,
163 )
164 .await
165}
166
167#[allow(clippy::too_many_arguments)]
169pub async fn run_proxy_with_acme_and_fallback(
170 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
171 wasm_triggers: SharedWasmTriggers,
172 wasm_invoker: Option<WasmInvoker>,
173 acme_manager: AcmeManager,
174 domains: Vec<String>,
175 fallback: Option<FallbackConfig>,
176) -> anyhow::Result<SharedCertResolver> {
177 let resolver = Arc::new(acme::DynCertResolver::new());
178
179 let acme_mgr = acme_manager.clone();
180 let routes_clone = route_table.clone();
181 let triggers_clone = wasm_triggers.clone();
182 let invoker_clone = wasm_invoker.clone();
183 let fallback_http = fallback.clone();
184 let fallback_tls = fallback.clone();
185
186 let http_handle = tokio::spawn({
188 let acme = acme_mgr.clone();
189 let routes = routes_clone.clone();
190 let triggers = triggers_clone.clone();
191 let invoker = invoker_clone.clone();
192 async move {
193 if let Err(e) = run_proxy_with_fallback(
194 routes,
195 triggers,
196 invoker,
197 80,
198 None,
199 Some(acme),
200 fallback_http,
201 )
202 .await
203 {
204 error!("HTTP listener failed: {e}");
205 }
206 }
207 });
208
209 let resolver_clone = resolver.clone();
211 let https_handle = tokio::spawn(async move {
212 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213
214 const PER_DOMAIN_PROVISION_TIMEOUT: std::time::Duration =
222 std::time::Duration::from_secs(60);
223 for domain in &domains {
224 let fut = acme_mgr.ensure_cert_for_resolver(domain, &resolver_clone);
225 match tokio::time::timeout(PER_DOMAIN_PROVISION_TIMEOUT, fut).await {
226 Ok(Ok(())) => {}
227 Ok(Err(e)) => {
228 error!(domain = %domain, error = %e, "Failed to provision cert");
229 }
230 Err(_) => {
231 warn!(
232 domain = %domain,
233 timeout_secs = PER_DOMAIN_PROVISION_TIMEOUT.as_secs(),
234 "Cert provisioning timed out — skipping (HTTPS will start without this cert; reconciler may retry on demand)"
235 );
236 }
237 }
238 }
239
240 let config = rustls::ServerConfig::builder()
242 .with_no_client_auth()
243 .with_cert_resolver(resolver_clone);
244
245 let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
246 info!(
247 "Starting HTTPS with SNI resolver ({} domains)",
248 domains.len()
249 );
250
251 let routes = routes_clone;
252 let triggers = triggers_clone;
253 let invoker = invoker_clone;
254 if let Err(e) = run_proxy_with_fallback(
255 routes,
256 triggers,
257 invoker,
258 443,
259 Some(acceptor),
260 Some(acme_mgr),
261 fallback_tls,
262 )
263 .await
264 {
265 error!("HTTPS listener failed: {e}");
266 }
267 });
268
269 tokio::spawn(async move {
272 tokio::select! {
273 _ = http_handle => warn!("HTTP listener exited"),
274 _ = https_handle => warn!("HTTPS listener exited"),
275 }
276 });
277
278 Ok(resolver)
279}
280
281async fn serve_loop(
283 listener: TcpListener,
284 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
285 wasm_triggers: SharedWasmTriggers,
286 wasm_invoker: Option<WasmInvoker>,
287 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
288 acme_manager: Option<AcmeManager>,
289) -> anyhow::Result<()> {
290 serve_loop_with_fallback(
291 listener,
292 route_table,
293 wasm_triggers,
294 wasm_invoker,
295 tls_acceptor,
296 acme_manager,
297 None,
298 )
299 .await
300}
301
302#[allow(clippy::too_many_arguments)]
304pub(crate) async fn serve_loop_with_fallback(
305 listener: TcpListener,
306 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
307 wasm_triggers: SharedWasmTriggers,
308 wasm_invoker: Option<WasmInvoker>,
309 tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
310 acme_manager: Option<AcmeManager>,
311 fallback: Option<FallbackConfig>,
312) -> anyhow::Result<()> {
313 let counter = Arc::new(AtomicUsize::new(0));
314 let client = Arc::new(
315 reqwest::Client::builder()
316 .no_proxy()
317 .redirect(reqwest::redirect::Policy::none())
318 .connect_timeout(std::time::Duration::from_secs(10))
323 .timeout(std::time::Duration::from_secs(300))
324 .pool_idle_timeout(std::time::Duration::from_secs(90))
325 .build()
326 .expect("failed to build HTTP client"),
327 );
328 let acme = acme_manager.map(Arc::new);
329 let is_tls = tls_acceptor.is_some();
330 let rate_limiter = RateLimiter::new();
331
332 let fallback = Arc::new(fallback);
333 loop {
334 let (stream, peer) = match listener.accept().await {
335 Ok(conn) => conn,
336 Err(e) => {
337 warn!("Proxy accept error: {e}");
338 continue;
339 }
340 };
341
342 let routes = route_table.clone();
343 let triggers = wasm_triggers.clone();
344 let invoker = wasm_invoker.clone();
345 let counter = counter.clone();
346 let client = client.clone();
347 let acme = acme.clone();
348 let tls = tls_acceptor.clone();
349 let rl = rate_limiter.clone();
350 let fb = fallback.clone();
351 let routes_for_sni = routes.clone();
352
353 let fb_for_service = fb.clone();
354 tokio::spawn(async move {
355 let service = service_fn(move |req: Request<Incoming>| {
356 let routes = routes.clone();
357 let triggers = triggers.clone();
358 let invoker = invoker.clone();
359 let counter = counter.clone();
360 let client = client.clone();
361 let acme = acme.clone();
362 let rl = rl.clone();
363 let fb = fb_for_service.clone();
364 async move {
365 if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
366 return Ok(resp);
367 }
368 handle_request(
369 req,
370 &routes,
371 &triggers,
372 invoker.as_ref(),
373 &counter,
374 &client,
375 is_tls,
376 &rl,
377 peer,
378 fb.as_ref().as_ref(),
379 )
380 .await
381 }
382 });
383 if let Some(acceptor) = tls {
384 let mut stream = stream;
385 let sni = sni::peek_sni(&mut stream).await;
387 let should_passthrough = if let Some(ref host) = sni {
388 let routes_lock = routes_for_sni.read().await;
389 let known = routes_lock.contains_key(host);
390 drop(routes_lock);
391 !known && fb.as_ref().as_ref().and_then(|f| f.tls.as_ref()).is_some()
392 } else {
393 false
394 };
395
396 if should_passthrough {
397 let target = fb
398 .as_ref()
399 .as_ref()
400 .and_then(|f| f.tls.clone())
401 .expect("checked above");
402 debug!(?sni, %target, "SNI passthrough");
403 match tokio::net::TcpStream::connect(&target).await {
404 Ok(mut backend) => {
405 if let Err(e) =
406 tokio::io::copy_bidirectional(&mut stream, &mut backend).await
407 {
408 debug!("Passthrough copy error from {peer}: {e}");
409 }
410 }
411 Err(e) => warn!("Failed to connect to TLS fallback {target}: {e}"),
412 }
413 return;
414 }
415
416 match acceptor.accept(stream).await {
417 Ok(tls_stream) => {
418 let io = TokioIo::new(tls_stream);
419 if let Err(e) = http1::Builder::new()
420 .serve_connection(io, service)
421 .with_upgrades()
422 .await
423 {
424 debug!("TLS proxy error from {peer}: {e}");
425 }
426 }
427 Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
428 }
429 } else {
430 let io = TokioIo::new(stream);
431 if let Err(e) = http1::Builder::new()
432 .serve_connection(io, service)
433 .with_upgrades()
434 .await
435 {
436 debug!("Proxy connection error from {peer}: {e}");
437 }
438 }
439 });
440 }
441}