Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5//! Supports automatic TLS via ACME/Let's Encrypt (Caddy-style zero-config).
6
7pub mod acme;
8mod body;
9mod forward;
10mod handler;
11pub mod rate_limit;
12mod routing;
13pub mod sni;
14pub mod tls;
15mod websocket;
16
17pub use orca_core::config::FallbackConfig;
18
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::sync::atomic::AtomicUsize;
22
23use hyper::Request;
24use hyper::body::Incoming;
25use hyper::server::conn::http1;
26use hyper::service::service_fn;
27use hyper_util::rt::TokioIo;
28use tokio::net::TcpListener;
29use tokio::sync::RwLock;
30use tracing::{debug, error, info, warn};
31
32use acme::AcmeManager;
33use handler::{handle_acme_challenge, handle_request};
34use rate_limit::RateLimiter;
35
36/// A backend target for container routing.
37#[derive(Debug, Clone)]
38pub struct RouteTarget {
39    /// Address in the form `ip:port`.
40    pub address: String,
41    /// Owning service name.
42    pub service_name: String,
43    /// Optional path pattern (e.g., `"/api/*"`). When `None`, this target is a
44    /// catch-all for the domain. When `Some`, only requests whose path matches
45    /// the pattern are routed here. Longest-prefix match wins.
46    pub path_pattern: Option<String>,
47    /// Traffic weight (1-100, default 100). Used for weighted routing
48    /// during canary deployments. Higher weight = more traffic.
49    pub weight: u32,
50    /// Prefix to strip from the request path before forwarding upstream,
51    /// e.g. `"/admin"`. With `path_pattern = "/admin/*"` and
52    /// `strip_prefix = Some("/admin")`, a request for `/admin/users` is
53    /// forwarded as `/users` — same semantics as Caddy's `handle_path`.
54    pub strip_prefix: Option<String>,
55}
56
57/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
58#[derive(Debug, Clone)]
59pub struct WasmTrigger {
60    /// Path pattern (e.g., "/api/edge/*").
61    pub pattern: String,
62    /// Wasm runtime instance ID.
63    pub runtime_id: String,
64    /// Service name for logging.
65    pub service_name: String,
66}
67
68/// Callback invoked when a request matches a Wasm trigger.
69/// Receives (runtime_id, method, path, body) and returns the response body string.
70pub type WasmInvoker =
71    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
72
73/// Future type returned by the Wasm invoker.
74pub type WasmInvokeFuture =
75    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
76
77/// Shared Wasm trigger table type.
78pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
79
80/// Run the reverse proxy on the given port.
81pub async fn run_proxy(
82    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
83    wasm_triggers: SharedWasmTriggers,
84    wasm_invoker: Option<WasmInvoker>,
85    port: u16,
86    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
87    acme_manager: Option<AcmeManager>,
88) -> anyhow::Result<()> {
89    let addr = format!("0.0.0.0:{port}");
90    let listener = TcpListener::bind(&addr).await?;
91    let proto = if tls_acceptor.is_some() {
92        "HTTPS"
93    } else {
94        "HTTP"
95    };
96    info!("Reverse proxy listening on {addr} ({proto})");
97
98    serve_loop(
99        listener,
100        route_table,
101        wasm_triggers,
102        wasm_invoker,
103        tls_acceptor,
104        acme_manager,
105    )
106    .await
107}
108
109/// Run the proxy with optional fallback support.
110#[allow(clippy::too_many_arguments)]
111pub async fn run_proxy_with_fallback(
112    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
113    wasm_triggers: SharedWasmTriggers,
114    wasm_invoker: Option<WasmInvoker>,
115    port: u16,
116    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
117    acme_manager: Option<AcmeManager>,
118    fallback: Option<FallbackConfig>,
119) -> anyhow::Result<()> {
120    let addr = format!("0.0.0.0:{port}");
121    let listener = TcpListener::bind(&addr).await?;
122    let proto = if tls_acceptor.is_some() {
123        "HTTPS"
124    } else {
125        "HTTP"
126    };
127    info!("Reverse proxy listening on {addr} ({proto})");
128
129    serve_loop_with_fallback(
130        listener,
131        route_table,
132        wasm_triggers,
133        wasm_invoker,
134        tls_acceptor,
135        acme_manager,
136        fallback,
137    )
138    .await
139}
140
141/// Shared dynamic cert resolver for hot-provisioning.
142pub type SharedCertResolver = Arc<acme::DynCertResolver>;
143
144/// Run HTTP on port 80 (for ACME challenges + redirect) and HTTPS on port 443.
145///
146/// Automatically provisions certs for all given domains via Let's Encrypt.
147/// Returns a `SharedCertResolver` that can be used to hot-provision certs
148/// for new domains added later via `orca deploy`.
149pub async fn run_proxy_with_acme(
150    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
151    wasm_triggers: SharedWasmTriggers,
152    wasm_invoker: Option<WasmInvoker>,
153    acme_manager: AcmeManager,
154    domains: Vec<String>,
155) -> anyhow::Result<SharedCertResolver> {
156    run_proxy_with_acme_and_fallback(
157        route_table,
158        wasm_triggers,
159        wasm_invoker,
160        acme_manager,
161        domains,
162        None,
163    )
164    .await
165}
166
167/// Run HTTP+HTTPS with ACME and optional fallback to another reverse proxy.
168#[allow(clippy::too_many_arguments)]
169pub async fn run_proxy_with_acme_and_fallback(
170    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
171    wasm_triggers: SharedWasmTriggers,
172    wasm_invoker: Option<WasmInvoker>,
173    acme_manager: AcmeManager,
174    domains: Vec<String>,
175    fallback: Option<FallbackConfig>,
176) -> anyhow::Result<SharedCertResolver> {
177    let resolver = Arc::new(acme::DynCertResolver::new());
178
179    let acme_mgr = acme_manager.clone();
180    let routes_clone = route_table.clone();
181    let triggers_clone = wasm_triggers.clone();
182    let invoker_clone = wasm_invoker.clone();
183    let fallback_http = fallback.clone();
184    let fallback_tls = fallback.clone();
185
186    // Start HTTP on port 80 first (needed for ACME challenge validation)
187    let http_handle = tokio::spawn({
188        let acme = acme_mgr.clone();
189        let routes = routes_clone.clone();
190        let triggers = triggers_clone.clone();
191        let invoker = invoker_clone.clone();
192        async move {
193            if let Err(e) = run_proxy_with_fallback(
194                routes,
195                triggers,
196                invoker,
197                80,
198                None,
199                Some(acme),
200                fallback_http,
201            )
202            .await
203            {
204                error!("HTTP listener failed: {e}");
205            }
206        }
207    });
208
209    // Provision certs for initial domains, then start HTTPS with SNI resolver
210    let resolver_clone = resolver.clone();
211    let https_handle = tokio::spawn(async move {
212        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
213
214        // Provision all initial domain certs. Each call gets its own 60s
215        // timeout: without it, a single domain whose LE HTTP-01 challenge
216        // hangs (DNS pointing elsewhere, port 80 firewalled, LE rate limit
217        // backoff) blocks the entire HTTPS listener startup forever. 60s is
218        // generous — a healthy LE order completes in 5-15s — so a timeout
219        // here is a real problem, but we'd rather serve the other domains
220        // than serve nothing.
221        const PER_DOMAIN_PROVISION_TIMEOUT: std::time::Duration =
222            std::time::Duration::from_secs(60);
223        for domain in &domains {
224            let fut = acme_mgr.ensure_cert_for_resolver(domain, &resolver_clone);
225            match tokio::time::timeout(PER_DOMAIN_PROVISION_TIMEOUT, fut).await {
226                Ok(Ok(())) => {}
227                Ok(Err(e)) => {
228                    error!(domain = %domain, error = %e, "Failed to provision cert");
229                }
230                Err(_) => {
231                    warn!(
232                        domain = %domain,
233                        timeout_secs = PER_DOMAIN_PROVISION_TIMEOUT.as_secs(),
234                        "Cert provisioning timed out — skipping (HTTPS will start without this cert; reconciler may retry on demand)"
235                    );
236                }
237            }
238        }
239
240        // Build TlsAcceptor with SNI resolver for multi-domain support
241        let config = rustls::ServerConfig::builder()
242            .with_no_client_auth()
243            .with_cert_resolver(resolver_clone);
244
245        let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
246        info!(
247            "Starting HTTPS with SNI resolver ({} domains)",
248            domains.len()
249        );
250
251        let routes = routes_clone;
252        let triggers = triggers_clone;
253        let invoker = invoker_clone;
254        if let Err(e) = run_proxy_with_fallback(
255            routes,
256            triggers,
257            invoker,
258            443,
259            Some(acceptor),
260            Some(acme_mgr),
261            fallback_tls,
262        )
263        .await
264        {
265            error!("HTTPS listener failed: {e}");
266        }
267    });
268
269    // Don't block — return the resolver so the control plane can hot-add certs.
270    // The HTTP and HTTPS listeners run in the background.
271    tokio::spawn(async move {
272        tokio::select! {
273            _ = http_handle => warn!("HTTP listener exited"),
274            _ = https_handle => warn!("HTTPS listener exited"),
275        }
276    });
277
278    Ok(resolver)
279}
280
281/// Core accept loop shared by HTTP and HTTPS listeners.
282async fn serve_loop(
283    listener: TcpListener,
284    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
285    wasm_triggers: SharedWasmTriggers,
286    wasm_invoker: Option<WasmInvoker>,
287    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
288    acme_manager: Option<AcmeManager>,
289) -> anyhow::Result<()> {
290    serve_loop_with_fallback(
291        listener,
292        route_table,
293        wasm_triggers,
294        wasm_invoker,
295        tls_acceptor,
296        acme_manager,
297        None,
298    )
299    .await
300}
301
302/// Serve loop variant with fallback support for SNI passthrough and HTTP forwarding.
303#[allow(clippy::too_many_arguments)]
304pub(crate) async fn serve_loop_with_fallback(
305    listener: TcpListener,
306    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
307    wasm_triggers: SharedWasmTriggers,
308    wasm_invoker: Option<WasmInvoker>,
309    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
310    acme_manager: Option<AcmeManager>,
311    fallback: Option<FallbackConfig>,
312) -> anyhow::Result<()> {
313    let counter = Arc::new(AtomicUsize::new(0));
314    let client = Arc::new(
315        reqwest::Client::builder()
316            .no_proxy()
317            .redirect(reqwest::redirect::Policy::none())
318            // Timeouts are mandatory: without them, a hung upstream (slow
319            // backend, dead fallback, slowloris) parks the per-request task
320            // forever. We accept many in-flight tasks but every one must
321            // eventually complete so the proxy can recover without a restart.
322            .connect_timeout(std::time::Duration::from_secs(10))
323            .timeout(std::time::Duration::from_secs(300))
324            .pool_idle_timeout(std::time::Duration::from_secs(90))
325            .build()
326            .expect("failed to build HTTP client"),
327    );
328    let acme = acme_manager.map(Arc::new);
329    let is_tls = tls_acceptor.is_some();
330    let rate_limiter = RateLimiter::new();
331
332    let fallback = Arc::new(fallback);
333    loop {
334        let (stream, peer) = match listener.accept().await {
335            Ok(conn) => conn,
336            Err(e) => {
337                warn!("Proxy accept error: {e}");
338                continue;
339            }
340        };
341
342        let routes = route_table.clone();
343        let triggers = wasm_triggers.clone();
344        let invoker = wasm_invoker.clone();
345        let counter = counter.clone();
346        let client = client.clone();
347        let acme = acme.clone();
348        let tls = tls_acceptor.clone();
349        let rl = rate_limiter.clone();
350        let fb = fallback.clone();
351        let routes_for_sni = routes.clone();
352
353        let fb_for_service = fb.clone();
354        tokio::spawn(async move {
355            let service = service_fn(move |req: Request<Incoming>| {
356                let routes = routes.clone();
357                let triggers = triggers.clone();
358                let invoker = invoker.clone();
359                let counter = counter.clone();
360                let client = client.clone();
361                let acme = acme.clone();
362                let rl = rl.clone();
363                let fb = fb_for_service.clone();
364                async move {
365                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
366                        return Ok(resp);
367                    }
368                    handle_request(
369                        req,
370                        &routes,
371                        &triggers,
372                        invoker.as_ref(),
373                        &counter,
374                        &client,
375                        is_tls,
376                        &rl,
377                        peer,
378                        fb.as_ref().as_ref(),
379                    )
380                    .await
381                }
382            });
383            if let Some(acceptor) = tls {
384                let mut stream = stream;
385                // Peek SNI to decide between local TLS termination and pass-through
386                let sni = sni::peek_sni(&mut stream).await;
387                let should_passthrough = if let Some(ref host) = sni {
388                    let routes_lock = routes_for_sni.read().await;
389                    let known = routes_lock.contains_key(host);
390                    drop(routes_lock);
391                    !known && fb.as_ref().as_ref().and_then(|f| f.tls.as_ref()).is_some()
392                } else {
393                    false
394                };
395
396                if should_passthrough {
397                    let target = fb
398                        .as_ref()
399                        .as_ref()
400                        .and_then(|f| f.tls.clone())
401                        .expect("checked above");
402                    debug!(?sni, %target, "SNI passthrough");
403                    match tokio::net::TcpStream::connect(&target).await {
404                        Ok(mut backend) => {
405                            if let Err(e) =
406                                tokio::io::copy_bidirectional(&mut stream, &mut backend).await
407                            {
408                                debug!("Passthrough copy error from {peer}: {e}");
409                            }
410                        }
411                        Err(e) => warn!("Failed to connect to TLS fallback {target}: {e}"),
412                    }
413                    return;
414                }
415
416                match acceptor.accept(stream).await {
417                    Ok(tls_stream) => {
418                        let io = TokioIo::new(tls_stream);
419                        if let Err(e) = http1::Builder::new()
420                            .serve_connection(io, service)
421                            .with_upgrades()
422                            .await
423                        {
424                            debug!("TLS proxy error from {peer}: {e}");
425                        }
426                    }
427                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
428                }
429            } else {
430                let io = TokioIo::new(stream);
431                if let Err(e) = http1::Builder::new()
432                    .serve_connection(io, service)
433                    .with_upgrades()
434                    .await
435                {
436                    debug!("Proxy connection error from {peer}: {e}");
437                }
438            }
439        });
440    }
441}